From ddb35af02ac0bb868f0ad017aee52b5c15349980 Mon Sep 17 00:00:00 2001 From: huahaoyu Date: Fri, 16 Dec 2022 14:09:33 +0800 Subject: [PATCH 1/3] [BitSail#106][Connector] Migrate hadoop source connector to v1 interface. --- bitsail-connectors/connector-hadoop/pom.xml | 175 ++++++++++++++++++ .../hadoop/constant/HadoopConstants.java | 23 +++ .../hadoop/error/HadoopErrorCode.java | 51 +++++ .../hadoop/option/HadoopReaderOptions.java | 48 +++++ .../connector/hadoop/source/HadoopSource.java | 97 ++++++++++ .../hadoop/source/config/HadoopConf.java | 52 ++++++ .../HadoopSourceSplitCoordinator.java | 121 ++++++++++++ .../source/reader/HadoopSourceReader.java | 152 +++++++++++++++ .../source/split/HadoopSourceSplit.java | 35 ++++ .../bitsail-connector-unified-hadoop.json | 9 + .../hadoop/source/HadoopSourceITCase.java | 66 +++++++ .../test/resources/hadoop_to_print_json.json | 58 ++++++ .../resources/test_namespace/source/test.json | 1 + bitsail-connectors/pom.xml | 1 + bitsail-dist/pom.xml | 7 + .../connectors/hadoop/hadoop-v1-example.md | 108 +++++++++++ .../documents/connectors/hadoop/hadoop-v1.md | 129 +++++++++++++ .../connectors/hadoop/hadoop-v1-example.md | 108 +++++++++++ .../documents/connectors/hadoop/hadoop-v1.md | 128 +++++++++++++ 19 files changed, 1369 insertions(+) create mode 100644 bitsail-connectors/connector-hadoop/pom.xml create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json create mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json create mode 100644 website/en/documents/connectors/hadoop/hadoop-v1-example.md create mode 100644 website/en/documents/connectors/hadoop/hadoop-v1.md create mode 100644 website/zh/documents/connectors/hadoop/hadoop-v1-example.md create mode 100644 website/zh/documents/connectors/hadoop/hadoop-v1.md diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml new file mode 100644 index 000000000..646019a26 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/pom.xml @@ -0,0 +1,175 @@ + + + + + + bitsail-connectors + com.bytedance.bitsail + ${revision} + + 4.0.0 + + connector-hadoop + + + 8 + 8 + UTF-8 + + + + + + org.apache.hadoop + hadoop-common + + + log4j + log4j + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + com.codahale.metrics + metrics-core + + + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + guice-servlet + com.google.inject.extensions + + + guice + com.google.inject + + + jersey-guice + com.sun.jersey.contribs + + + log4j + * + + + log4j + log4j + + + metrics-core + com.codahale.metrics + + + guava + com.google.guava + + + asm + asm + + + io.netty + netty + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-xc + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-core-asl + + + provided + + + + com.bytedance.bitsail + bitsail-component-format-csv + ${revision} + compile + + + + com.bytedance.bitsail + bitsail-component-format-json + ${revision} + compile + + + + + com.bytedance.bitsail + bitsail-connector-print + ${revision} + test + + + + com.bytedance.bitsail + connector-print + ${revision} + provided + + + + com.bytedance.bitsail + bitsail-connector-test + ${revision} + test + + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + + + guice + com.google.inject + + + guice-servlet + com.google.inject.extensions + + + + + diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java new file mode 100644 index 000000000..0dd973067 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.constant; + +public class HadoopConstants { + public static String HADOOP_CONNECTOR_NAME = "hadoop"; + public static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + public static final String SCHEMA = "hdfs"; +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java new file mode 100644 index 000000000..ad42bc5df --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.error; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum HadoopErrorCode implements ErrorCode { + + REQUIRED_VALUE("Hadoop-01", "You missed parameter which is required, please check your configuration."), + UNSUPPORTED_ENCODING("Hadoop-02", "Unsupported Encoding."), + UNSUPPORTED_COLUMN_TYPE("Hadoop-03", "Unsupported column type."), + HDFS_IO("Hadoop-04", "IO Exception."); + + private final String code; + private final String description; + + HadoopErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, + this.description); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java new file mode 100644 index 000000000..ae666f67d --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions { + @Essential + ConfigOption DEFAULT_FS = + key(READER_PREFIX + "defaultFS") + .noDefaultValue(String.class); + @Essential + ConfigOption PATH_LIST = + key(READER_PREFIX + "path_list") + .noDefaultValue(String.class); + + @Essential + ConfigOption CONTENT_TYPE = + key(READER_PREFIX + "content_type") + .noDefaultValue(String.class); + + ConfigOption READER_PARALLELISM_NUM = + key(READER_PREFIX + "reader_parallelism_num") + .noDefaultValue(Integer.class); + + ConfigOption DEFAULT_HADOOP_PARALLELISM_THRESHOLD = + key(READER_PREFIX + "default_hadoop_parallelism_threshold") + .defaultValue(2); +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java new file mode 100644 index 000000000..45e675456 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java @@ -0,0 +1,97 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.coordinator.HadoopSourceSplitCoordinator; +import com.bytedance.bitsail.connector.hadoop.source.reader.HadoopSourceReader; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class HadoopSource implements Source, ParallelismComputable { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class); + + private BitSailConfiguration readerConfiguration; + private List hadoopPathList; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException { + this.readerConfiguration = readerConfiguration; + hadoopPathList = Arrays.asList(readerConfiguration.getNecessaryOption(HadoopReaderOptions.PATH_LIST, HadoopErrorCode.REQUIRED_VALUE).split(",")); + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new HadoopSourceReader(readerConfiguration, readerContext); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { + return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList); + } + + @Override + public String getReaderName() { + return HadoopConstants.HADOOP_CONNECTOR_NAME; + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new BitSailTypeInfoConverter(); + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + int adviceParallelism; + if (selfConf.fieldExists(HadoopReaderOptions.READER_PARALLELISM_NUM)) { + adviceParallelism = selfConf.get(HadoopReaderOptions.READER_PARALLELISM_NUM); + } else { + int parallelismThreshold = selfConf.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD); + adviceParallelism = Math.max(hadoopPathList.size() / parallelismThreshold, 1); + } + LOG.info("Parallelism for this job will set to {}.", adviceParallelism); + return ParallelismAdvice.builder() + .adviceParallelism(adviceParallelism) + .enforceDownStreamChain(true) + .build(); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java new file mode 100644 index 000000000..d9f279ee1 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.config; + +import lombok.Data; +import org.apache.hadoop.conf.Configuration; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants.HDFS_IMPL; +import static com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants.SCHEMA; + +@Data +public class HadoopConf implements Serializable { + + private Map extraOptions = new HashMap<>(); + private String hdfsNameKey; + + public HadoopConf(String hdfsNameKey) { + this.hdfsNameKey = hdfsNameKey; + } + + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + public String getSchema() { + return SCHEMA; + } + + public void setExtraOptionsForConfiguration(Configuration configuration) { + if (!extraOptions.isEmpty()) { + extraOptions.forEach(configuration::set); + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java new file mode 100644 index 000000000..f998b2ce7 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java @@ -0,0 +1,121 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; + +public class HadoopSourceSplitCoordinator implements SourceSplitCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceSplitCoordinator.class); + private final Context coordinatorContext; + private final BitSailConfiguration readerConfiguration; + private final HashSet assignedHadoopSplits; + private final List hadoopPathList; + private HashSet pendingHadoopSplits; + + public HadoopSourceSplitCoordinator(BitSailConfiguration readerConfiguration, + Context coordinatorContext, List hadoopPathList) { + this.coordinatorContext = coordinatorContext; + this.readerConfiguration = readerConfiguration; + this.hadoopPathList = hadoopPathList; + this.assignedHadoopSplits = Sets.newHashSet(); + } + + @Override + public void start() { + this.pendingHadoopSplits = Sets.newHashSet(); + hadoopPathList.forEach(k -> pendingHadoopSplits.add(new HadoopSourceSplit(k))); + int readerNum = coordinatorContext.totalParallelism(); + LOG.info("Found {} readers and {} splits.", readerNum, pendingHadoopSplits.size()); + if (readerNum > pendingHadoopSplits.size()) { + LOG.error("Reader number {} is larger than split number {}.", readerNum, pendingHadoopSplits.size()); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}", subtaskId); + assignSplit(subtaskId); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + pendingHadoopSplits.addAll(splits); + assignSplit(subtaskId); + } + } + + private void assignSplit(int subtaskId) { + ArrayList currentTaskSplits = new ArrayList<>(); + int readerNum = coordinatorContext.totalParallelism(); + if (readerNum == 1) { + // if parallelism == 1, we should assign all the splits to reader + currentTaskSplits.addAll(pendingHadoopSplits); + } else { + // if parallelism > 1, according to hashCode of splitId to determine which reader to allocate the current task + for (HadoopSourceSplit hadoopSourceSplit : pendingHadoopSplits) { + int readerIndex = getReaderIndex(hadoopSourceSplit.uniqSplitId(), readerNum); + if (readerIndex == subtaskId) { + currentTaskSplits.add(hadoopSourceSplit); + } + } + } + // assign splits + coordinatorContext.assignSplit(subtaskId, currentTaskSplits); + // save the state of assigned splits + assignedHadoopSplits.addAll(currentTaskSplits); + // remove the assigned splits from pending splits + currentTaskSplits.forEach(split -> pendingHadoopSplits.remove(split)); + LOG.info("SubTask {} is assigned to [{}]", subtaskId, currentTaskSplits.stream().map(HadoopSourceSplit::uniqSplitId).collect(Collectors.joining(","))); + coordinatorContext.signalNoMoreSplits(subtaskId); + LOG.info("Finish assigning splits reader {}", subtaskId); + } + + private int getReaderIndex(String splitId, int totalReaderNum) { + return (splitId.hashCode() & Integer.MAX_VALUE) % totalReaderNum; + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + + } + + @Override + public EmptyState snapshotState() throws Exception { + return new EmptyState(); + } + + @Override + public void close() { + + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java new file mode 100644 index 000000000..71791a3d4 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java @@ -0,0 +1,152 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.enumerate.ContentType; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; +import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.config.HadoopConf; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static com.bytedance.bitsail.common.option.ReaderOptions.BaseReaderOptions.CONTENT_TYPE; + +public class HadoopSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceReader.class); + private static final long serialVersionUID = 1L; + private final BitSailConfiguration readerConfiguration; + private final DeserializationSchema deserializationSchema; + private final Context readerContext; + private final HadoopConf hadoopConf; + private final HashSet assignedHadoopSplits; + private final HashSet finishedHadoopSplits; + private boolean noMoreSplits; + + public HadoopSourceReader(BitSailConfiguration readerConfiguration, Context readerContext) { + this.readerConfiguration = readerConfiguration; + this.readerContext = readerContext; + this.assignedHadoopSplits = Sets.newHashSet(); + this.finishedHadoopSplits = Sets.newHashSet(); + this.noMoreSplits = false; + + String defaultFS = readerConfiguration.getNecessaryOption(HadoopReaderOptions.DEFAULT_FS, HadoopErrorCode.REQUIRED_VALUE); + this.hadoopConf = new HadoopConf(defaultFS); + ContentType contentType = ContentType.valueOf(readerConfiguration.getNecessaryOption(CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); + switch (contentType) { + case CSV: + deserializationSchema = + new CsvDeserializationSchema(readerConfiguration, readerContext.getTypeInfos(), readerContext.getFieldNames()); + break; + case JSON: + deserializationSchema = + new JsonDeserializationSchema(readerConfiguration, readerContext.getTypeInfos(), readerContext.getFieldNames()); + ; + break; + default: + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); + } + } + + @Override + public void start() { + + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + for (HadoopSourceSplit sourceSplit : assignedHadoopSplits) { + String hadoopPath = sourceSplit.uniqSplitId(); + LOG.info("Start to process split: {}", hadoopPath); + Configuration conf = getConfiguration(hadoopConf); + FileSystem fs = FileSystem.get(conf); + Path filePath = new Path(hadoopPath); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + reader.lines().forEach(line -> { + try { + Row row = deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8)); + pipeline.output(row); + } catch (IOException e) { + throw BitSailException.asBitSailException(HadoopErrorCode.HDFS_IO, e); + } + }); + } + finishedHadoopSplits.add(sourceSplit); + } + assignedHadoopSplits.removeAll(finishedHadoopSplits); + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("Subtask {} received no more split signal.", readerContext.getIndexOfSubtask()); + noMoreSplits = true; + } + + @Override + public void addSplits(List splitList) { + assignedHadoopSplits.addAll(splitList); + } + + @Override + public boolean hasMoreElements() { + if (noMoreSplits) { + return CollectionUtils.size(assignedHadoopSplits) != 0; + } + return true; + } + + private Configuration getConfiguration(HadoopConf hadoopConf) { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); + hadoopConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java new file mode 100644 index 000000000..6dd0ffbe2 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +public class HadoopSourceSplit implements SourceSplit { + + private static final long serialVersionUID = 1L; + + private final String splitId; + + public HadoopSourceSplit(String splitId) { + this.splitId = splitId; + } + + @Override + public String uniqSplitId() { + return splitId; + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json new file mode 100644 index 000000000..5ec9bb9d9 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json @@ -0,0 +1,9 @@ +{ + "name": "bitsail-connector-unified-hadoop", + "classes": [ + "com.bytedance.bitsail.connector.hadoop.source.HadoopSource" + ], + "libs": [ + "connector-hadoop-${version}.jar" + ] +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java new file mode 100644 index 000000000..4fd1cf3da --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java @@ -0,0 +1,66 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class HadoopSourceITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class); + private static FileSystem fs; + + @Before + public void setUp() throws IOException, InterruptedException { + MiniClusterUtil.setUp(); + fs = MiniClusterUtil.fileSystem; + } + + @After + public void close() { + MiniClusterUtil.shutdown(); + fs = null; + } + + @Test + public void testHadoopToPrintJson() throws Exception { + String localJsonFile = "test_namespace/source/test.json"; + String remoteJsonFile = "/test_namespace/source/test.json"; + Configuration conf = fs.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + ClassLoader classLoader = JobConfUtils.class.getClassLoader(); + fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_json.json"); + jobConf.set(HadoopReaderOptions.DEFAULT_FS, defaultFS); + jobConf.set(HadoopReaderOptions.PATH_LIST, remoteJsonFile); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json new file mode 100644 index 000000000..e62363870 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json @@ -0,0 +1,58 @@ +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.print.sink.PrintSink", + "content_type": "json", + "batch_size": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json b/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json new file mode 100644 index 000000000..1eb170ed1 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json @@ -0,0 +1 @@ +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} \ No newline at end of file diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml index b9e4d7963..9a0748cfa 100644 --- a/bitsail-connectors/pom.xml +++ b/bitsail-connectors/pom.xml @@ -46,6 +46,7 @@ connector-druid connector-assert connector-selectdb + connector-hadoop diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml index 285d542a1..4009c554f 100644 --- a/bitsail-dist/pom.xml +++ b/bitsail-dist/pom.xml @@ -272,6 +272,13 @@ ${revision} provided + + + com.bytedance.bitsail + connector-hadoop + ${revision} + provided + diff --git a/website/en/documents/connectors/hadoop/hadoop-v1-example.md b/website/en/documents/connectors/hadoop/hadoop-v1-example.md new file mode 100644 index 000000000..914aba5ac --- /dev/null +++ b/website/en/documents/connectors/hadoop/hadoop-v1-example.md @@ -0,0 +1,108 @@ +# Hadoop connector examples + +Parent document: [hadoop-connector](./hadoop.md) + +The following configuration shows how to organize parameter configuration to read the following json format hdfs file. + +- Example json data +```json +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} +``` + + +- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs +- Configuration file used to read the above hdfs file: + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +The following configuration shows how to organize parameter configuration to read the following csv format hdfs file. + +- Example csv data + +```csv +1,100001,100.001,text_0001,2020-01-01 +2,100002,100.002,text_0002,2020-01-02 +``` + + +- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs +- Configuration file used to read the above hdfs file: + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.csv", + "content_type":"csv", + "reader_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "long" + }, + { + "name": "int_type", + "type": "int" + }, + { + "name": "double_type", + "type": "double" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + } + ] + } + } +} +``` + diff --git a/website/en/documents/connectors/hadoop/hadoop-v1.md b/website/en/documents/connectors/hadoop/hadoop-v1.md new file mode 100644 index 000000000..13bd343c3 --- /dev/null +++ b/website/en/documents/connectors/hadoop/hadoop-v1.md @@ -0,0 +1,129 @@ +# Hadoop connector + +Parent document: [connectors](../introduction.md) + + +## Main function + +Hadoop connector can be used to read hdfs files in batch scenarios. Its function points mainly include: + + - Support reading files in multiple hdfs directories at the same time + - Support reading hdfs files of various formats + +## Maven dependency + +```text + + com.bytedance.bitsail + connector-hadoop + ${revision} + +``` + +## Supported data types + - Basic data types supported by Hadoop connectors: + - Integer type: + - short + - int + - long + - biginterger + - Float type: + - float + - double + - bigdecimal + - Time type: + - timestamp + - date + - time + - String type: + - string + - Bool type: + - boolean + - Binary type: + - binary +- Composited data types supported by Hadoop connectors: + - map + - list + +## Parameters + +The following mentioned parameters should be added to `job.reader` block when using, for example: + +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +### Necessary parameters + +| Param name | Required | Optional value | Description | +| :----------- | :------- | :------------- | :----------------------------------------------------------- | +| class | Yes | | Class name of hadoop connector, v1 connector is `com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | +| defaultFS | Yes | | NameNode URI | +| path_list | Yes | | Specifies the path of the read in file. Multiple paths can be specified, separated by `','` | +| content_type | Yes | JSON
CSV | Specify the format of the read in file. | +| columns | Yes | | Describing fields' names and types | + +### Optional parameters +| Param name | Required | Optional value | Description | +| :----------------------------------- | :------- | :------------- | :-------------------------------------------------------- | +| default_hadoop_parallelism_threshold | No | | The number of files read by each reader, the default is 2 | +| reader_parallelism_num | No | | Reader parallelism | + + +## Supported format + +Support the following formats: + +- [JSON](#jump_json) +- [CSV](#jump_csv) + +### JSON +It supports parsing text files in json format. Each line is required to be a standard json string. + +### CSV +Support parsing of text files in csv format. Each line is required to be a standard csv string. + +The following parameters are supported to adjust the csv parsing style: + + +| Parameter name | Default value | Description | +|-----------------------------------|---------------|----------------------------------------------------------------------------| +| `job.common.csv_delimiter` | `','` | csv delimiter | +| `job.common.csv_escape` | | escape character | +| `job.common.csv_quote` | | quote character | +| `job.common.csv_with_null_string` | | Specify the conversion value of null field. It is not converted by default | + +---- + + +## Related document + +Configuration examples: [hadoop-v1-connector-example](./hadoop-v1-example.md) diff --git a/website/zh/documents/connectors/hadoop/hadoop-v1-example.md b/website/zh/documents/connectors/hadoop/hadoop-v1-example.md new file mode 100644 index 000000000..625622868 --- /dev/null +++ b/website/zh/documents/connectors/hadoop/hadoop-v1-example.md @@ -0,0 +1,108 @@ +# Hadoop连接器使用示例 + +上级文档: [hadoop连接器](./hadoop_zh.md) + +下面展示了如何使用用户参数配置读取如下json格式hdfs文件。 + +- 示例json数据 +```json +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} +``` + + +- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下 +- 用于读取上述格式hdfs文件的配置 + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +下面展示了如何使用用户参数配置读取如下csv格式hdfs文件。 + +- 示例csv数据 + +```csv +1,100001,100.001,text_0001,2020-01-01 +2,100002,100.002,text_0002,2020-01-02 +``` + + +- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下 +- 用于读取上述格式hdfs文件的配置 + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.csv", + "content_type":"csv", + "reader_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "long" + }, + { + "name": "int_type", + "type": "int" + }, + { + "name": "double_type", + "type": "double" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + } + ] + } + } +} +``` + diff --git a/website/zh/documents/connectors/hadoop/hadoop-v1.md b/website/zh/documents/connectors/hadoop/hadoop-v1.md new file mode 100644 index 000000000..ff4a0458f --- /dev/null +++ b/website/zh/documents/connectors/hadoop/hadoop-v1.md @@ -0,0 +1,128 @@ +# Hadoop连接器 + +上级文档: [connectors](../introduction_zh.md) + + +## 主要功能 + +Hadoop连接器可用于批式场景下的hdfs文件读取。其功能点主要包括: + + - 支持同时读取多个hdfs目录下的文件 + - 支持读取多种格式的hdfs文件 + +## 依赖引入 + +```text + + com.bytedance.bitsail + connector-hadoop + ${revision} + +``` + +## 支持的数据类型 + - 支持的基础数据类型如下: + - 整数类型: + - short + - int + - long + - bitinteger + - 浮点类型: + - float + - double + - bigdecimal + - 时间类型: + - timestamp + - date + - time + - 字符类型: + - string + - 布尔类型: + - boolean + - 二进制类型: + - binary + - 支持的复杂数据类型包括: + - map + - list + +## 主要参数 + +以下参数使用在`job.reader`配置中,实际使用时请注意路径前缀。示例: +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +### 必需参数 + +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +| :----------- | :----------- | :---------- | :----------------------------------------------------------- | +| class | 是 | | Hadoop读连接器类名,v1 connector为为`com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | +| defaultFS | 是 | | NameNode URI | +| path_list | 是 | | 指定读入文件的路径。可指定多个路径,使用`','`分隔 | +| content_type | 是 | JSON
CSV | 指定读入文件的格式 | +| columns | 是 | | 数据字段名称及类型 | + +### 可选参数 +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +| :----------------------------------- | :----------- | :--------- | :-------------------------------- | +| default_hadoop_parallelism_threshold | 否 | | 每个reader读取的文件数目,默认为2 | +| reader_parallelism_num | 否 | | 读并发数,无默认值 | + + +## 支持的文件格式 + +支持对以下格式的文件进行解读: + +- [JSON](#jump_json) +- [CSV](#jump_csv) + +### JSON +支持对json格式的文本文件进行解析,要求每行均为标准的json字符串。 + +### CSV +支持对csv格式的文本文件进行解析,要求每行均为标准的csv字符串。 +支持以下参数对csv解析方式进行调整: + + +| 参数名称 | 参数默认值 | 参数说明 | +|-----------------------------------|-------|--------------------| +| `job.common.csv_delimiter` | `','` | csv分隔符 | +| `job.common.csv_escape` | | escape字符 | +| `job.common.csv_quote` | | quote字符 | +| `job.common.csv_with_null_string` | | 指定null字段的转化值,默认不转化 | + +---- + + +## 相关文档 + +配置示例文档 [hadoop v1连接器示例](./hadoop-v1-example.md) + From ceef217f2f0e53157c556085bc6850c2b3b283dc Mon Sep 17 00:00:00 2001 From: huahaoyu Date: Mon, 9 Jan 2023 11:36:53 +0800 Subject: [PATCH 2/3] [BitSail#106][Connector] Migrate hadoop source connector to v1 interface & support more InputFormat. --- bitsail-connectors/connector-hadoop/pom.xml | 20 +++ .../hadoop/constant/HadoopConstants.java | 2 - .../HiveInputFormatDeserializationSchema.java | 148 ++++++++++++++++++ .../TextInputFormatDeserializationSchema.java | 70 +++++++++ .../hadoop/option/HadoopReaderOptions.java | 11 +- .../connector/hadoop/source/HadoopSource.java | 6 +- .../hadoop/source/config/HadoopConf.java | 52 ------ .../HadoopSourceSplitCoordinator.java | 38 ++++- .../source/reader/HadoopSourceReader.java | 118 +++++++------- .../source/split/HadoopSourceSplit.java | 78 ++++++++- .../source/HadoopOrcInputFormatITCase.java | 67 ++++++++ .../HadoopParquetInputFormatITCase.java | 68 ++++++++ ....java => HadoopTextInputFormatITCase.java} | 11 +- .../test/resources/hadoop_to_print_hive.json | 56 +++++++ ...nt_json.json => hadoop_to_print_text.json} | 2 - .../resources/test_namespace/source/test_orc | Bin 0 -> 533 bytes .../test_namespace/source/test_parquet | Bin 0 -> 620 bytes website/en/documents/connectors/README.md | 1 + .../hadoop/{ => v1}/hadoop-v1-example.md | 6 +- .../connectors/hadoop/{ => v1}/hadoop-v1.md | 15 +- website/zh/documents/connectors/README.md | 1 + .../hadoop/{ => v1}/hadoop-v1-example.md | 6 +- .../connectors/hadoop/{ => v1}/hadoop-v1.md | 61 ++++---- 23 files changed, 647 insertions(+), 190 deletions(-) create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java delete mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java create mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java create mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java rename bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/{HadoopSourceITCase.java => HadoopTextInputFormatITCase.java} (91%) create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json rename bitsail-connectors/connector-hadoop/src/test/resources/{hadoop_to_print_json.json => hadoop_to_print_text.json} (92%) create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_parquet rename website/en/documents/connectors/hadoop/{ => v1}/hadoop-v1-example.md (90%) rename website/en/documents/connectors/hadoop/{ => v1}/hadoop-v1.md (88%) rename website/zh/documents/connectors/hadoop/{ => v1}/hadoop-v1-example.md (90%) rename website/zh/documents/connectors/hadoop/{ => v1}/hadoop-v1.md (75%) diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml index 646019a26..1a2f20947 100644 --- a/bitsail-connectors/connector-hadoop/pom.xml +++ b/bitsail-connectors/connector-hadoop/pom.xml @@ -148,6 +148,26 @@ provided + + com.bytedance.bitsail + bitsail-shaded-hive + ${revision} + + + org.apache.ant + ant + + + log4j + log4j + + + commons-net + commons-net + + + + com.bytedance.bitsail bitsail-connector-test diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java index 0dd973067..60075afd3 100644 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java @@ -18,6 +18,4 @@ public class HadoopConstants { public static String HADOOP_CONNECTOR_NAME = "hadoop"; - public static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; - public static final String SCHEMA = "hdfs"; } diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java new file mode 100644 index 000000000..7327dc176 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java @@ -0,0 +1,148 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.format; + +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; + +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +public class HiveInputFormatDeserializationSchema implements DeserializationSchema { + private final BitSailConfiguration deserializationConfiguration; + private final TypeInfo[] typeInfos; + private final String[] fieldNames; + private final StructObjectInspector inspector; + public HiveInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, + TypeInfo[] typeInfos, + String[] fieldNames) { + + this.deserializationConfiguration = deserializationConfiguration; + this.typeInfos = typeInfos; + this.fieldNames = fieldNames; + + List columnInfos = deserializationConfiguration.get(HadoopReaderOptions.COLUMNS); + Properties p = new Properties(); + String columns = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.joining(",")); + String columnsTypes = columnInfos.stream().map(ColumnInfo::getType).collect(Collectors.joining(":")); + p.setProperty("columns", columns); + p.setProperty("columns.types", columnsTypes); + String inputFormatClass = deserializationConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS); + try { + switch (inputFormatClass) { + case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat": { + OrcSerde serde = new OrcSerde(); + serde.initialize(new JobConf(), p); + this.inspector = (StructObjectInspector) serde.getObjectInspector(); + break; + } + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat": { + ParquetHiveSerDe serde = new ParquetHiveSerDe(); + serde.initialize(new JobConf(), p); + this.inspector = (StructObjectInspector) serde.getObjectInspector(); + break; + } + default: + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported input format class: " + inputFormatClass); + } + } catch (SerDeException e) { + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_COLUMN_TYPE, "unsupported column information."); + } + } + + @Override + public Row deserialize(Writable message) { + int arity = fieldNames.length; + List fields = inspector.getAllStructFieldRefs(); + Row row = new Row(arity); + for (int i = 0; i < arity; ++i) { + Object writableData = inspector.getStructFieldData(message, fields.get(i)); + row.setField(i, getWritableValue(writableData)); + } + return row; + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + private Object getWritableValue(Object writable) { + Object ret; + + if (writable == null) { + ret = null; + } else if (writable instanceof IntWritable) { + ret = ((IntWritable) writable).get(); + } else if (writable instanceof Text) { + ret = writable.toString(); + } else if (writable instanceof LongWritable) { + ret = ((LongWritable) writable).get(); + } else if (writable instanceof ByteWritable) { + ret = ((ByteWritable) writable).get(); + } else if (writable instanceof DateWritable) { + ret = ((DateWritable) writable).get(); + } else if (writable instanceof DoubleWritable) { + ret = ((DoubleWritable) writable).get(); + } else if (writable instanceof TimestampWritable) { + ret = ((TimestampWritable) writable).getTimestamp(); + } else if (writable instanceof FloatWritable) { + ret = ((FloatWritable) writable).get(); + } else if (writable instanceof BooleanWritable) { + ret = ((BooleanWritable) writable).get(); + } else if (writable instanceof BytesWritable) { + BytesWritable bytesWritable = (BytesWritable) writable; + byte[] bytes = bytesWritable.getBytes(); + ret = new byte[bytesWritable.getLength()]; + System.arraycopy(bytes, 0, ret, 0, bytesWritable.getLength()); + } else if (writable instanceof HiveDecimalWritable) { + ret = ((HiveDecimalWritable) writable).getHiveDecimal().bigDecimalValue(); + } else if (writable instanceof ShortWritable) { + ret = ((ShortWritable) writable).get(); + } else { + ret = writable.toString(); + } + return ret; + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java new file mode 100644 index 000000000..27a7f7cbf --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java @@ -0,0 +1,70 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.format; + +import com.bytedance.bitsail.base.enumerate.ContentType; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; +import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; + +import org.apache.hadoop.io.Writable; + +public class TextInputFormatDeserializationSchema implements DeserializationSchema { + private final BitSailConfiguration deserializationConfiguration; + private final TypeInfo[] typeInfos; + private final String[] fieldNames; + private final DeserializationSchema deserializationSchema; + + public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, + TypeInfo[] typeInfos, + String[] fieldNames) { + this.deserializationConfiguration = deserializationConfiguration; + this.typeInfos = typeInfos; + this.fieldNames = fieldNames; + + ContentType contentType = ContentType.valueOf( + deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); + switch (contentType) { + case CSV: + this.deserializationSchema = + new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); + break; + case JSON: + this.deserializationSchema = + new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); + break; + default: + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); + } + } + + @Override + public Row deserialize(Writable message) { + return deserializationSchema.deserialize((message.toString()).getBytes()); + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java index ae666f67d..74773437c 100644 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java @@ -25,15 +25,10 @@ public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions { @Essential - ConfigOption DEFAULT_FS = - key(READER_PREFIX + "defaultFS") - .noDefaultValue(String.class); - @Essential ConfigOption PATH_LIST = key(READER_PREFIX + "path_list") .noDefaultValue(String.class); - @Essential ConfigOption CONTENT_TYPE = key(READER_PREFIX + "content_type") .noDefaultValue(String.class); @@ -45,4 +40,8 @@ public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions { ConfigOption DEFAULT_HADOOP_PARALLELISM_THRESHOLD = key(READER_PREFIX + "default_hadoop_parallelism_threshold") .defaultValue(2); -} \ No newline at end of file + + ConfigOption HADOOP_INPUT_FORMAT_CLASS = + key(READER_PREFIX + "hadoop_inputformat_class") + .defaultValue("org.apache.hadoop.mapred.TextInputFormat"); +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java index 45e675456..19d5d6119 100644 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java @@ -42,7 +42,7 @@ import java.util.Arrays; import java.util.List; -public class HadoopSource implements Source, ParallelismComputable { +public class HadoopSource implements Source, ParallelismComputable { private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class); private BitSailConfiguration readerConfiguration; @@ -61,12 +61,12 @@ public Boundedness getSourceBoundedness() { @Override public SourceReader createReader(SourceReader.Context readerContext) { - return new HadoopSourceReader(readerConfiguration, readerContext); + return new HadoopSourceReader(readerConfiguration, readerContext, hadoopPathList); } @Override public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { - return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList); + return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList); } @Override diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java deleted file mode 100644 index d9f279ee1..000000000 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source.config; - -import lombok.Data; -import org.apache.hadoop.conf.Configuration; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import static com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants.HDFS_IMPL; -import static com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants.SCHEMA; - -@Data -public class HadoopConf implements Serializable { - - private Map extraOptions = new HashMap<>(); - private String hdfsNameKey; - - public HadoopConf(String hdfsNameKey) { - this.hdfsNameKey = hdfsNameKey; - } - - public String getFsHdfsImpl() { - return HDFS_IMPL; - } - - public String getSchema() { - return SCHEMA; - } - - public void setExtraOptionsForConfiguration(Configuration configuration) { - if (!extraOptions.isEmpty()) { - extraOptions.forEach(configuration::set); - } - } -} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java index f998b2ce7..a5eeeb46b 100644 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java @@ -19,40 +19,70 @@ import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; -public class HadoopSourceSplitCoordinator implements SourceSplitCoordinator { +public class HadoopSourceSplitCoordinator implements SourceSplitCoordinator { private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceSplitCoordinator.class); private final Context coordinatorContext; private final BitSailConfiguration readerConfiguration; private final HashSet assignedHadoopSplits; - private final List hadoopPathList; private HashSet pendingHadoopSplits; + private final JobConf jobConf; + private final InputFormat mapredInputFormat; public HadoopSourceSplitCoordinator(BitSailConfiguration readerConfiguration, Context coordinatorContext, List hadoopPathList) { this.coordinatorContext = coordinatorContext; this.readerConfiguration = readerConfiguration; - this.hadoopPathList = hadoopPathList; + this.jobConf = new JobConf(); + for (String path : hadoopPathList) { + FileInputFormat.addInputPath(this.jobConf, new Path(path)); + } + String inputClassName = readerConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS); + Class inputClass; + try { + inputClass = Class.forName(inputClassName); + this.mapredInputFormat = (InputFormat) inputClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); this.assignedHadoopSplits = Sets.newHashSet(); } @Override public void start() { this.pendingHadoopSplits = Sets.newHashSet(); - hadoopPathList.forEach(k -> pendingHadoopSplits.add(new HadoopSourceSplit(k))); + int parallelismThreshold = readerConfiguration.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD); int readerNum = coordinatorContext.totalParallelism(); + int splitNum = readerNum * parallelismThreshold; + InputSplit[] splits; + try { + splits = mapredInputFormat.getSplits(jobConf, splitNum); + } catch (IOException e) { + throw new RuntimeException(e); + } + Arrays.stream(splits).forEach(split -> pendingHadoopSplits.add(new HadoopSourceSplit(split))); LOG.info("Found {} readers and {} splits.", readerNum, pendingHadoopSplits.size()); if (readerNum > pendingHadoopSplits.size()) { LOG.error("Reader number {} is larger than split number {}.", readerNum, pendingHadoopSplits.size()); diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java index 71791a3d4..37cdc018a 100644 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java @@ -18,70 +18,82 @@ import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; -import com.bytedance.bitsail.base.enumerate.ContentType; import com.bytedance.bitsail.base.format.DeserializationSchema; -import com.bytedance.bitsail.common.BitSailException; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; -import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema; -import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.format.HiveInputFormatDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.format.TextInputFormatDeserializationSchema; import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.connector.hadoop.source.config.HadoopConf; import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.List; -import static com.bytedance.bitsail.common.option.ReaderOptions.BaseReaderOptions.CONTENT_TYPE; - -public class HadoopSourceReader implements SourceReader { +public class HadoopSourceReader implements SourceReader { private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceReader.class); private static final long serialVersionUID = 1L; private final BitSailConfiguration readerConfiguration; - private final DeserializationSchema deserializationSchema; private final Context readerContext; - private final HadoopConf hadoopConf; private final HashSet assignedHadoopSplits; private final HashSet finishedHadoopSplits; private boolean noMoreSplits; - - public HadoopSourceReader(BitSailConfiguration readerConfiguration, Context readerContext) { + protected JobConf jobConf; + private InputFormat mapredInputFormat; + private K key; + private V value; + private DeserializationSchema deserializationSchema; + private RecordReader recordReader; + + public HadoopSourceReader(BitSailConfiguration readerConfiguration, Context readerContext, List hadoopPathList) { this.readerConfiguration = readerConfiguration; this.readerContext = readerContext; this.assignedHadoopSplits = Sets.newHashSet(); this.finishedHadoopSplits = Sets.newHashSet(); + this.jobConf = new JobConf(); + for (String path : hadoopPathList) { + FileInputFormat.addInputPath(this.jobConf, new Path(path)); + } + + String inputClassName = readerConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS); + Class inputClass; + try { + inputClass = Class.forName(inputClassName); + this.mapredInputFormat = (InputFormat) inputClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); this.noMoreSplits = false; - String defaultFS = readerConfiguration.getNecessaryOption(HadoopReaderOptions.DEFAULT_FS, HadoopErrorCode.REQUIRED_VALUE); - this.hadoopConf = new HadoopConf(defaultFS); - ContentType contentType = ContentType.valueOf(readerConfiguration.getNecessaryOption(CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); - switch (contentType) { - case CSV: - deserializationSchema = - new CsvDeserializationSchema(readerConfiguration, readerContext.getTypeInfos(), readerContext.getFieldNames()); - break; - case JSON: - deserializationSchema = - new JsonDeserializationSchema(readerConfiguration, readerContext.getTypeInfos(), readerContext.getFieldNames()); - ; - break; - default: - throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); + if (this.mapredInputFormat instanceof TextInputFormat) { + deserializationSchema = new TextInputFormatDeserializationSchema( + readerConfiguration, + readerContext.getTypeInfos(), + readerContext.getFieldNames()); + } else if (this.mapredInputFormat instanceof MapredParquetInputFormat + || this.mapredInputFormat instanceof OrcInputFormat) { + deserializationSchema = new HiveInputFormatDeserializationSchema( + readerConfiguration, + readerContext.getTypeInfos(), + readerContext.getFieldNames()); } } @@ -93,20 +105,18 @@ public void start() { @Override public void pollNext(SourcePipeline pipeline) throws Exception { for (HadoopSourceSplit sourceSplit : assignedHadoopSplits) { - String hadoopPath = sourceSplit.uniqSplitId(); - LOG.info("Start to process split: {}", hadoopPath); - Configuration conf = getConfiguration(hadoopConf); - FileSystem fs = FileSystem.get(conf); - Path filePath = new Path(hadoopPath); - try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { - reader.lines().forEach(line -> { - try { - Row row = deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8)); - pipeline.output(row); - } catch (IOException e) { - throw BitSailException.asBitSailException(HadoopErrorCode.HDFS_IO, e); - } - }); + + sourceSplit.initInputSplit(jobConf); + LOG.info("Start to process split: {}", sourceSplit.uniqSplitId()); + this.recordReader = this.mapredInputFormat.getRecordReader(sourceSplit.getHadoopInputSplit(), jobConf, Reporter.NULL); + if (this.recordReader instanceof Configurable) { + ((Configurable) this.recordReader).setConf(jobConf); + } + key = this.recordReader.createKey(); + value = this.recordReader.createValue(); + while (this.recordReader.next(key, value)) { + Row row = deserializationSchema.deserialize((Writable) value); + pipeline.output(row); } finishedHadoopSplits.add(sourceSplit); } @@ -132,14 +142,6 @@ public boolean hasMoreElements() { return true; } - private Configuration getConfiguration(HadoopConf hadoopConf) { - Configuration configuration = new Configuration(); - configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); - configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); - hadoopConf.setExtraOptionsForConfiguration(configuration); - return configuration; - } - @Override public List snapshotState(long checkpointId) { return Collections.emptyList(); @@ -147,6 +149,6 @@ public List snapshotState(long checkpointId) { @Override public void close() throws Exception { - + recordReader.close(); } } diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java index 6dd0ffbe2..5b25b993e 100644 --- a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java @@ -18,18 +18,84 @@ import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; -public class HadoopSourceSplit implements SourceSplit { +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import static com.google.common.base.Preconditions.checkNotNull; + +public class HadoopSourceSplit implements SourceSplit { private static final long serialVersionUID = 1L; + private final Class splitType; + private transient InputSplit hadoopInputSplit; + private byte[] hadoopInputSplitByteArray; + + public HadoopSourceSplit(InputSplit inputSplit) { + if (inputSplit == null) { + throw new NullPointerException("Hadoop input split must not be null"); + } + + this.splitType = inputSplit.getClass(); + this.hadoopInputSplit = inputSplit; + } + + public InputSplit getHadoopInputSplit() { + return this.hadoopInputSplit; + } + + public void initInputSplit(JobConf jobConf) { + if (this.hadoopInputSplit != null) { + return; + } + + checkNotNull(hadoopInputSplitByteArray); + + try { + this.hadoopInputSplit = (InputSplit) WritableFactories.newInstance(splitType); + + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(jobConf); + } else if (this.hadoopInputSplit instanceof JobConfigurable) { + ((JobConfigurable) this.hadoopInputSplit).configure(jobConf); + } + + if (hadoopInputSplitByteArray != null) { + try (ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(hadoopInputSplitByteArray))) { + this.hadoopInputSplit.readFields(objectInputStream); + } + + this.hadoopInputSplitByteArray = null; + } + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e); + } + } - private final String splitId; + private void writeObject(ObjectOutputStream out) throws IOException { - public HadoopSourceSplit(String splitId) { - this.splitId = splitId; + if (hadoopInputSplit != null) { + try ( + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream) + ) { + this.hadoopInputSplit.write(objectOutputStream); + objectOutputStream.flush(); + this.hadoopInputSplitByteArray = byteArrayOutputStream.toByteArray(); + } + } + out.defaultWriteObject(); } @Override public String uniqSplitId() { - return splitId; + return hadoopInputSplit.toString(); } -} \ No newline at end of file +} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java new file mode 100644 index 000000000..3f953983b --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java @@ -0,0 +1,67 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class HadoopOrcInputFormatITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopOrcInputFormatITCase.class); + private static FileSystem fs; + + @Before + public void setUp() throws IOException, InterruptedException { + MiniClusterUtil.setUp(); + fs = MiniClusterUtil.fileSystem; + } + + @After + public void close() { + MiniClusterUtil.shutdown(); + fs = null; + } + + @Test + public void testHadoopToPrintOrc() throws Exception { + String localJsonFile = "test_namespace/source/test_orc"; + String remoteJsonFile = "/test_namespace/source/test_orc"; + Configuration conf = fs.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + ClassLoader classLoader = JobConfUtils.class.getClassLoader(); + fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); + String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java new file mode 100644 index 000000000..6e69f3900 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class HadoopParquetInputFormatITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopParquetInputFormatITCase.class); + + private static FileSystem fs; + + @Before + public void setUp() throws IOException, InterruptedException { + MiniClusterUtil.setUp(); + fs = MiniClusterUtil.fileSystem; + } + + @After + public void close() { + MiniClusterUtil.shutdown(); + fs = null; + } + + @Test + public void testHadoopToPrintParquet() throws Exception { + String localJsonFile = "test_namespace/source/test_parquet"; + String remoteJsonFile = "/test_namespace/source/test_parquet"; + Configuration conf = fs.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + ClassLoader classLoader = JobConfUtils.class.getClassLoader(); + fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); + String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java similarity index 91% rename from bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java rename to bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java index 4fd1cf3da..a06a7c05a 100644 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java @@ -33,8 +33,8 @@ import java.io.IOException; -public class HadoopSourceITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class); +public class HadoopTextInputFormatITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopTextInputFormatITCase.class); private static FileSystem fs; @Before @@ -58,9 +58,8 @@ public void testHadoopToPrintJson() throws Exception { LOG.info("fs.defaultFS: {}", defaultFS); ClassLoader classLoader = JobConfUtils.class.getClassLoader(); fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_json.json"); - jobConf.set(HadoopReaderOptions.DEFAULT_FS, defaultFS); - jobConf.set(HadoopReaderOptions.PATH_LIST, remoteJsonFile); + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json"); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); EmbeddedFlinkCluster.submitJob(jobConf); } -} \ No newline at end of file +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json new file mode 100644 index 000000000..9f013c6f9 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json @@ -0,0 +1,56 @@ +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns":[ + { + "name":"id_card", + "type":"int" + }, + { + "name":"tran_time", + "type":"string" + }, + { + "name":"name", + "type":"string" + }, + { + "name":"cash", + "type":"int" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.print.sink.PrintSink", + "content_type": "json", + "batch_size": 1, + "columns":[ + { + "name":"id_card", + "type":"int" + }, + { + "name":"tran_time", + "type":"string" + }, + { + "name":"name", + "type":"string" + }, + { + "name":"cash", + "type":"int" + } + ] + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json similarity index 92% rename from bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json rename to bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json index e62363870..8c0c7aa7a 100644 --- a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json +++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json @@ -8,8 +8,6 @@ }, "reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.json", "content_type":"json", "reader_parallelism_num": 1, "columns": [ diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc b/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc new file mode 100644 index 0000000000000000000000000000000000000000..4017688d46db8baf3bfbd05684ef6613c9af0a9b GIT binary patch literal 533 zcmeYdau#G@;9?VE;b074Fa$EixR@Cj7=-va7=<`FF7OLn;Fq|-AHWd8!0>p}v!ql8 zMnz>lH&t1^L}g}W=2&L$SY>8bXJPGRW^LwR#hfOgEJhv%6QJTr5XFi>#d3PjCj5y} z6i_@Uc7TCF8)%6z#1bB$C2Sli0x1$H0SufB3~>zX7t|RTj0}tnbPX(ZjSLhFO{|O! ztPD)q85sN+gn?v8elC!7VmJU6NvUCA@I7V7)nFjPeBocnzNLDP6c}F$R{ei%%EsRQ zfy49swDgA;v6Q%=peuUp}LqGY|;{Dt*#8LkKIxi9pvOcH&IASqD@KOL31S&3lO8=w5POj@#UQlcNt}WCMKQt>Gb#0@tXKZN7L&}xBkg9zEhJXY&frQE zo)W& zUHJ{YiLC}qAeXu4o_liUkjeeCg%NJz<{cK{P{WF@%0Ci9$4U@9BD|82gTU~xSE4p- zYSS(qb!}=`uIEsX_SGt|B-8-8iZ#$~;Z{jodRZrGUP`;*0ReJp*@l5C=e5MnTP$j? z4>!wr#}`?!~(h%V~bl01dtrnX&;ceK-5qZUqO-_~rmMswxHC=l$ z6&*zTsHSKq5ezdjB~w!wU&Lq5bzJ}BpFdM<>;y=^khCR#Z_}lm` rG+gF0H)MR^`!sZU=m*hYWIK#ojx&nvVH7&iAPS;e^eG@Tf&c6mXdG%o literal 0 HcmV?d00001 diff --git a/website/en/documents/connectors/README.md b/website/en/documents/connectors/README.md index 3e2967a2e..1ab26ce3b 100644 --- a/website/en/documents/connectors/README.md +++ b/website/en/documents/connectors/README.md @@ -16,6 +16,7 @@ dir: - [FTP/SFTP connector](ftp/ftp.md) - [FTP/SFTP-v1 connector](ftp/v1/ftp-v1.md) - [Hadoop connector](hadoop/hadoop.md) +- [Hadoop-v1 connector](hadoop/v1/hadoop-v1.md) - [HBase connector](hbase/hbase.md) - [Hive connector](hive/hive.md) - [Hudi connector](hudi/hudi.md) diff --git a/website/en/documents/connectors/hadoop/hadoop-v1-example.md b/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md similarity index 90% rename from website/en/documents/connectors/hadoop/hadoop-v1-example.md rename to website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md index 914aba5ac..c43805e08 100644 --- a/website/en/documents/connectors/hadoop/hadoop-v1-example.md +++ b/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md @@ -1,6 +1,6 @@ # Hadoop connector examples -Parent document: [hadoop-connector](./hadoop.md) +Parent document: [hadoop-connector](./hadoop-v1.md) The following configuration shows how to organize parameter configuration to read the following json format hdfs file. @@ -24,8 +24,6 @@ The following configuration shows how to organize parameter configuration to rea }, "reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.json", "content_type":"json", "reader_parallelism_num": 1, "columns": [ @@ -75,8 +73,6 @@ The following configuration shows how to organize parameter configuration to rea }, "reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.csv", "content_type":"csv", "reader_parallelism_num": 1, "columns": [ diff --git a/website/en/documents/connectors/hadoop/hadoop-v1.md b/website/en/documents/connectors/hadoop/v1/hadoop-v1.md similarity index 88% rename from website/en/documents/connectors/hadoop/hadoop-v1.md rename to website/en/documents/connectors/hadoop/v1/hadoop-v1.md index 13bd343c3..4a1d4a375 100644 --- a/website/en/documents/connectors/hadoop/hadoop-v1.md +++ b/website/en/documents/connectors/hadoop/v1/hadoop-v1.md @@ -1,6 +1,6 @@ # Hadoop connector -Parent document: [connectors](../introduction.md) +Parent document: [connectors](../../README.md) ## Main function @@ -54,8 +54,6 @@ The following mentioned parameters should be added to `job.reader` block when us "job": { "reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.json", "content_type":"json", "reader_parallelism_num": 1, "columns": [ @@ -76,7 +74,7 @@ The following mentioned parameters should be added to `job.reader` block when us "type": "list" } ] - } + }, } } ``` @@ -86,16 +84,15 @@ The following mentioned parameters should be added to `job.reader` block when us | Param name | Required | Optional value | Description | | :----------- | :------- | :------------- | :----------------------------------------------------------- | | class | Yes | | Class name of hadoop connector, v1 connector is `com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | -| defaultFS | Yes | | NameNode URI | | path_list | Yes | | Specifies the path of the read in file. Multiple paths can be specified, separated by `','` | | content_type | Yes | JSON
CSV | Specify the format of the read in file. | | columns | Yes | | Describing fields' names and types | ### Optional parameters -| Param name | Required | Optional value | Description | -| :----------------------------------- | :------- | :------------- | :-------------------------------------------------------- | -| default_hadoop_parallelism_threshold | No | | The number of files read by each reader, the default is 2 | -| reader_parallelism_num | No | | Reader parallelism | +| Param name | Required | Optional value | Description | +| :----------------------------------- | :------- | :------------- | :--------------------------------------------------------- | +| default_hadoop_parallelism_threshold | No | | The number of splits read by each reader, the default is 2 | +| reader_parallelism_num | No | | Reader parallelism | ## Supported format diff --git a/website/zh/documents/connectors/README.md b/website/zh/documents/connectors/README.md index 70bac6e39..3a8e14a35 100644 --- a/website/zh/documents/connectors/README.md +++ b/website/zh/documents/connectors/README.md @@ -16,6 +16,7 @@ dir: - [FTP/SFTP 连接器](ftp/ftp.md) - [FTP/SFTP-v1 连接器](ftp/v1/ftp-v1.md) - [Hadoop 连接器](hadoop/hadoop.md) +- [Hadoop-v1 连接器](hadoop/v1/hadoop-v1.md) - [HBase 连接器](hbase/hbase.md) - [Hive 连接器](hive/hive.md) - [Hudi 连接器](hudi/hudi.md) diff --git a/website/zh/documents/connectors/hadoop/hadoop-v1-example.md b/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md similarity index 90% rename from website/zh/documents/connectors/hadoop/hadoop-v1-example.md rename to website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md index 625622868..a709d0691 100644 --- a/website/zh/documents/connectors/hadoop/hadoop-v1-example.md +++ b/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md @@ -1,6 +1,6 @@ # Hadoop连接器使用示例 -上级文档: [hadoop连接器](./hadoop_zh.md) +上级文档: [hadoop连接器](./hadoop-v1.md) 下面展示了如何使用用户参数配置读取如下json格式hdfs文件。 @@ -24,8 +24,6 @@ }, "reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.json", "content_type":"json", "reader_parallelism_num": 1, "columns": [ @@ -75,8 +73,6 @@ }, "reader": { "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.csv", "content_type":"csv", "reader_parallelism_num": 1, "columns": [ diff --git a/website/zh/documents/connectors/hadoop/hadoop-v1.md b/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md similarity index 75% rename from website/zh/documents/connectors/hadoop/hadoop-v1.md rename to website/zh/documents/connectors/hadoop/v1/hadoop-v1.md index ff4a0458f..a3efe4587 100644 --- a/website/zh/documents/connectors/hadoop/hadoop-v1.md +++ b/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md @@ -1,6 +1,6 @@ # Hadoop连接器 -上级文档: [connectors](../introduction_zh.md) +上级文档: [connectors](../../README.md) ## 主要功能 @@ -50,42 +50,39 @@ Hadoop连接器可用于批式场景下的hdfs文件读取。其功能点主要 以下参数使用在`job.reader`配置中,实际使用时请注意路径前缀。示例: ```json { - "job": { - "reader": { - "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", - "defaultFS": "hdfs://127.0.0.1:9000/", - "path_list": "/test_namespace/source/test.json", - "content_type":"json", - "reader_parallelism_num": 1, - "columns": [ - { - "name":"id", - "type": "int" - }, - { - "name": "string_type", - "type": "string" - }, - { - "name": "map_string_string", - "type": "map" - }, - { - "name": "array_string", - "type": "list" - } - ] - } - } + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + }, + } } ``` ### 必需参数 -| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | -| :----------- | :----------- | :---------- | :----------------------------------------------------------- | -| class | 是 | | Hadoop读连接器类名,v1 connector为为`com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | -| defaultFS | 是 | | NameNode URI | +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +|:-------------| :----------- | :---------- | :----------------------------------------------------------- | +| class | 是 | | Hadoop读连接器类名,v1 connector为为`com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | | | path_list | 是 | | 指定读入文件的路径。可指定多个路径,使用`','`分隔 | | content_type | 是 | JSON
CSV | 指定读入文件的格式 | | columns | 是 | | 数据字段名称及类型 | From b87540854c4f421f4f4052d2cd09bd0fe9dda10e Mon Sep 17 00:00:00 2001 From: huahaoyu Date: Tue, 31 Jan 2023 21:21:59 +0800 Subject: [PATCH 3/3] fix pom conflict and rewrite hadoop source test. --- .../bitsail-connector-hadoop/pom.xml | 3 - bitsail-connectors/connector-hadoop/pom.xml | 117 ++---------------- .../source/HadoopOrcInputFormatITCase.java | 67 ---------- .../HadoopParquetInputFormatITCase.java | 68 ---------- .../hadoop/source/HadoopSourceITCase.java | 116 +++++++++++++++++ .../source/HadoopTextInputFormatITCase.java | 65 ---------- .../{test_namespace => }/source/test.json | 0 .../{test_namespace => }/source/test_orc | Bin .../{test_namespace => }/source/test_parquet | Bin 9 files changed, 129 insertions(+), 307 deletions(-) delete mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java delete mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java create mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java delete mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java rename bitsail-connectors/connector-hadoop/src/test/resources/{test_namespace => }/source/test.json (100%) rename bitsail-connectors/connector-hadoop/src/test/resources/{test_namespace => }/source/test_orc (100%) rename bitsail-connectors/connector-hadoop/src/test/resources/{test_namespace => }/source/test_parquet (100%) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml index 3a89a1e11..a7752388a 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml @@ -26,9 +26,6 @@ bitsail-connector-hadoop - - - com.bytedance.bitsail diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml index 1a2f20947..37182aafe 100644 --- a/bitsail-connectors/connector-hadoop/pom.xml +++ b/bitsail-connectors/connector-hadoop/pom.xml @@ -36,87 +36,27 @@ - org.apache.hadoop - hadoop-common - - - log4j - log4j - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.codahale.metrics - metrics-core - - + com.bytedance.bitsail + bitsail-shaded-hadoop + ${revision} provided - org.apache.hadoop - hadoop-mapreduce-client-core + com.bytedance.bitsail + bitsail-shaded-hive + ${revision} + provided - guice-servlet - com.google.inject.extensions - - - guice - com.google.inject - - - jersey-guice - com.sun.jersey.contribs - - - log4j - * - - - log4j - log4j - - - metrics-core - com.codahale.metrics - - - guava - com.google.guava - - - asm - asm - - + netty-common io.netty - netty - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-xc - - - org.codehaus.jackson - jackson-jaxrs - org.codehaus.jackson - jackson-core-asl + netty-buffer + io.netty - provided @@ -145,27 +85,7 @@ com.bytedance.bitsail connector-print ${revision} - provided - - - - com.bytedance.bitsail - bitsail-shaded-hive - ${revision} - - - org.apache.ant - ant - - - log4j - log4j - - - commons-net - commons-net - - + test @@ -173,21 +93,10 @@ bitsail-connector-test ${revision} test - - - - org.apache.hadoop - hadoop-minicluster - ${hadoop.version} - test - guice - com.google.inject - - - guice-servlet - com.google.inject.extensions + hppc + com.carrotsearch diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java deleted file mode 100644 index 3f953983b..000000000 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HadoopOrcInputFormatITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopOrcInputFormatITCase.class); - private static FileSystem fs; - - @Before - public void setUp() throws IOException, InterruptedException { - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; - } - - @After - public void close() { - MiniClusterUtil.shutdown(); - fs = null; - } - - @Test - public void testHadoopToPrintOrc() throws Exception { - String localJsonFile = "test_namespace/source/test_orc"; - String remoteJsonFile = "/test_namespace/source/test_orc"; - Configuration conf = fs.getConf(); - String defaultFS = conf.get("fs.defaultFS"); - LOG.info("fs.defaultFS: {}", defaultFS); - ClassLoader classLoader = JobConfUtils.class.getClassLoader(); - fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); - jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); - jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); - EmbeddedFlinkCluster.submitJob(jobConf); - } -} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java deleted file mode 100644 index 6e69f3900..000000000 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HadoopParquetInputFormatITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopParquetInputFormatITCase.class); - - private static FileSystem fs; - - @Before - public void setUp() throws IOException, InterruptedException { - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; - } - - @After - public void close() { - MiniClusterUtil.shutdown(); - fs = null; - } - - @Test - public void testHadoopToPrintParquet() throws Exception { - String localJsonFile = "test_namespace/source/test_parquet"; - String remoteJsonFile = "/test_namespace/source/test_parquet"; - Configuration conf = fs.getConf(); - String defaultFS = conf.get("fs.defaultFS"); - LOG.info("fs.defaultFS: {}", defaultFS); - ClassLoader classLoader = JobConfUtils.class.getClassLoader(); - fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); - jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); - jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); - EmbeddedFlinkCluster.submitJob(jobConf); - } -} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java new file mode 100644 index 000000000..3e0c4c0bf --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java @@ -0,0 +1,116 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class HadoopSourceITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class); + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(new File("/tmp")); + + private static FileSystem FILESYSTEM; + File folder; + + @Before + public void setUp() throws IOException { + FILESYSTEM = LocalFileSystem.getLocal(new Configuration()); + folder = TEMP_FOLDER.newFolder(); + } + + @After + public void close() { + FILESYSTEM = null; + folder = null; + } + + @Test + public void testHadoopToPrintJson() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test.json") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json"); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @Test + public void testHadoopToPrintParquet() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test_parquet") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @Test + public void testHadoopToPrintOrc() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test_orc") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java deleted file mode 100644 index a06a7c05a..000000000 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HadoopTextInputFormatITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopTextInputFormatITCase.class); - private static FileSystem fs; - - @Before - public void setUp() throws IOException, InterruptedException { - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; - } - - @After - public void close() { - MiniClusterUtil.shutdown(); - fs = null; - } - - @Test - public void testHadoopToPrintJson() throws Exception { - String localJsonFile = "test_namespace/source/test.json"; - String remoteJsonFile = "/test_namespace/source/test.json"; - Configuration conf = fs.getConf(); - String defaultFS = conf.get("fs.defaultFS"); - LOG.info("fs.defaultFS: {}", defaultFS); - ClassLoader classLoader = JobConfUtils.class.getClassLoader(); - fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json"); - jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); - EmbeddedFlinkCluster.submitJob(jobConf); - } -} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json b/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json similarity index 100% rename from bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json rename to bitsail-connectors/connector-hadoop/src/test/resources/source/test.json diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc similarity index 100% rename from bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc rename to bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_parquet b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet similarity index 100% rename from bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_parquet rename to bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet