From ddb35af02ac0bb868f0ad017aee52b5c15349980 Mon Sep 17 00:00:00 2001 From: huahaoyu Date: Fri, 16 Dec 2022 14:09:33 +0800 Subject: [PATCH] [BitSail#106][Connector] Migrate hadoop source connector to v1 interface. --- bitsail-connectors/connector-hadoop/pom.xml | 175 ++++++++++++++++++ .../hadoop/constant/HadoopConstants.java | 23 +++ .../hadoop/error/HadoopErrorCode.java | 51 +++++ .../hadoop/option/HadoopReaderOptions.java | 48 +++++ .../connector/hadoop/source/HadoopSource.java | 97 ++++++++++ .../hadoop/source/config/HadoopConf.java | 52 ++++++ .../HadoopSourceSplitCoordinator.java | 121 ++++++++++++ .../source/reader/HadoopSourceReader.java | 152 +++++++++++++++ .../source/split/HadoopSourceSplit.java | 35 ++++ .../bitsail-connector-unified-hadoop.json | 9 + .../hadoop/source/HadoopSourceITCase.java | 66 +++++++ .../test/resources/hadoop_to_print_json.json | 58 ++++++ .../resources/test_namespace/source/test.json | 1 + bitsail-connectors/pom.xml | 1 + bitsail-dist/pom.xml | 7 + .../connectors/hadoop/hadoop-v1-example.md | 108 +++++++++++ .../documents/connectors/hadoop/hadoop-v1.md | 129 +++++++++++++ .../connectors/hadoop/hadoop-v1-example.md | 108 +++++++++++ .../documents/connectors/hadoop/hadoop-v1.md | 128 +++++++++++++ 19 files changed, 1369 insertions(+) create mode 100644 bitsail-connectors/connector-hadoop/pom.xml create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java create mode 100644 bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json create mode 100644 bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json create mode 100644 bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json create mode 100644 website/en/documents/connectors/hadoop/hadoop-v1-example.md create mode 100644 website/en/documents/connectors/hadoop/hadoop-v1.md create mode 100644 website/zh/documents/connectors/hadoop/hadoop-v1-example.md create mode 100644 website/zh/documents/connectors/hadoop/hadoop-v1.md diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml new file mode 100644 index 000000000..646019a26 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/pom.xml @@ -0,0 +1,175 @@ + + + + + + bitsail-connectors + com.bytedance.bitsail + ${revision} + + 4.0.0 + + connector-hadoop + + + 8 + 8 + UTF-8 + + + + + + org.apache.hadoop + hadoop-common + + + log4j + log4j + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + com.codahale.metrics + metrics-core + + + provided + + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + guice-servlet + com.google.inject.extensions + + + guice + com.google.inject + + + jersey-guice + com.sun.jersey.contribs + + + log4j + * + + + log4j + log4j + + + metrics-core + com.codahale.metrics + + + guava + com.google.guava + + + asm + asm + + + io.netty + netty + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-xc + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-core-asl + + + provided + + + + com.bytedance.bitsail + bitsail-component-format-csv + ${revision} + compile + + + + com.bytedance.bitsail + bitsail-component-format-json + ${revision} + compile + + + + + com.bytedance.bitsail + bitsail-connector-print + ${revision} + test + + + + com.bytedance.bitsail + connector-print + ${revision} + provided + + + + com.bytedance.bitsail + bitsail-connector-test + ${revision} + test + + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + + + guice + com.google.inject + + + guice-servlet + com.google.inject.extensions + + + + + diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java new file mode 100644 index 000000000..0dd973067 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.constant; + +public class HadoopConstants { + public static String HADOOP_CONNECTOR_NAME = "hadoop"; + public static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + public static final String SCHEMA = "hdfs"; +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java new file mode 100644 index 000000000..ad42bc5df --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.error; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum HadoopErrorCode implements ErrorCode { + + REQUIRED_VALUE("Hadoop-01", "You missed parameter which is required, please check your configuration."), + UNSUPPORTED_ENCODING("Hadoop-02", "Unsupported Encoding."), + UNSUPPORTED_COLUMN_TYPE("Hadoop-03", "Unsupported column type."), + HDFS_IO("Hadoop-04", "IO Exception."); + + private final String code; + private final String description; + + HadoopErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, + this.description); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java new file mode 100644 index 000000000..ae666f67d --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions { + @Essential + ConfigOption DEFAULT_FS = + key(READER_PREFIX + "defaultFS") + .noDefaultValue(String.class); + @Essential + ConfigOption PATH_LIST = + key(READER_PREFIX + "path_list") + .noDefaultValue(String.class); + + @Essential + ConfigOption CONTENT_TYPE = + key(READER_PREFIX + "content_type") + .noDefaultValue(String.class); + + ConfigOption READER_PARALLELISM_NUM = + key(READER_PREFIX + "reader_parallelism_num") + .noDefaultValue(Integer.class); + + ConfigOption DEFAULT_HADOOP_PARALLELISM_THRESHOLD = + key(READER_PREFIX + "default_hadoop_parallelism_threshold") + .defaultValue(2); +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java new file mode 100644 index 000000000..45e675456 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java @@ -0,0 +1,97 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.coordinator.HadoopSourceSplitCoordinator; +import com.bytedance.bitsail.connector.hadoop.source.reader.HadoopSourceReader; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class HadoopSource implements Source, ParallelismComputable { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class); + + private BitSailConfiguration readerConfiguration; + private List hadoopPathList; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException { + this.readerConfiguration = readerConfiguration; + hadoopPathList = Arrays.asList(readerConfiguration.getNecessaryOption(HadoopReaderOptions.PATH_LIST, HadoopErrorCode.REQUIRED_VALUE).split(",")); + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new HadoopSourceReader(readerConfiguration, readerContext); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { + return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList); + } + + @Override + public String getReaderName() { + return HadoopConstants.HADOOP_CONNECTOR_NAME; + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new BitSailTypeInfoConverter(); + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + int adviceParallelism; + if (selfConf.fieldExists(HadoopReaderOptions.READER_PARALLELISM_NUM)) { + adviceParallelism = selfConf.get(HadoopReaderOptions.READER_PARALLELISM_NUM); + } else { + int parallelismThreshold = selfConf.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD); + adviceParallelism = Math.max(hadoopPathList.size() / parallelismThreshold, 1); + } + LOG.info("Parallelism for this job will set to {}.", adviceParallelism); + return ParallelismAdvice.builder() + .adviceParallelism(adviceParallelism) + .enforceDownStreamChain(true) + .build(); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java new file mode 100644 index 000000000..d9f279ee1 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/config/HadoopConf.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.config; + +import lombok.Data; +import org.apache.hadoop.conf.Configuration; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants.HDFS_IMPL; +import static com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants.SCHEMA; + +@Data +public class HadoopConf implements Serializable { + + private Map extraOptions = new HashMap<>(); + private String hdfsNameKey; + + public HadoopConf(String hdfsNameKey) { + this.hdfsNameKey = hdfsNameKey; + } + + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + public String getSchema() { + return SCHEMA; + } + + public void setExtraOptionsForConfiguration(Configuration configuration) { + if (!extraOptions.isEmpty()) { + extraOptions.forEach(configuration::set); + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java new file mode 100644 index 000000000..f998b2ce7 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java @@ -0,0 +1,121 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; + +public class HadoopSourceSplitCoordinator implements SourceSplitCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceSplitCoordinator.class); + private final Context coordinatorContext; + private final BitSailConfiguration readerConfiguration; + private final HashSet assignedHadoopSplits; + private final List hadoopPathList; + private HashSet pendingHadoopSplits; + + public HadoopSourceSplitCoordinator(BitSailConfiguration readerConfiguration, + Context coordinatorContext, List hadoopPathList) { + this.coordinatorContext = coordinatorContext; + this.readerConfiguration = readerConfiguration; + this.hadoopPathList = hadoopPathList; + this.assignedHadoopSplits = Sets.newHashSet(); + } + + @Override + public void start() { + this.pendingHadoopSplits = Sets.newHashSet(); + hadoopPathList.forEach(k -> pendingHadoopSplits.add(new HadoopSourceSplit(k))); + int readerNum = coordinatorContext.totalParallelism(); + LOG.info("Found {} readers and {} splits.", readerNum, pendingHadoopSplits.size()); + if (readerNum > pendingHadoopSplits.size()) { + LOG.error("Reader number {} is larger than split number {}.", readerNum, pendingHadoopSplits.size()); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}", subtaskId); + assignSplit(subtaskId); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + pendingHadoopSplits.addAll(splits); + assignSplit(subtaskId); + } + } + + private void assignSplit(int subtaskId) { + ArrayList currentTaskSplits = new ArrayList<>(); + int readerNum = coordinatorContext.totalParallelism(); + if (readerNum == 1) { + // if parallelism == 1, we should assign all the splits to reader + currentTaskSplits.addAll(pendingHadoopSplits); + } else { + // if parallelism > 1, according to hashCode of splitId to determine which reader to allocate the current task + for (HadoopSourceSplit hadoopSourceSplit : pendingHadoopSplits) { + int readerIndex = getReaderIndex(hadoopSourceSplit.uniqSplitId(), readerNum); + if (readerIndex == subtaskId) { + currentTaskSplits.add(hadoopSourceSplit); + } + } + } + // assign splits + coordinatorContext.assignSplit(subtaskId, currentTaskSplits); + // save the state of assigned splits + assignedHadoopSplits.addAll(currentTaskSplits); + // remove the assigned splits from pending splits + currentTaskSplits.forEach(split -> pendingHadoopSplits.remove(split)); + LOG.info("SubTask {} is assigned to [{}]", subtaskId, currentTaskSplits.stream().map(HadoopSourceSplit::uniqSplitId).collect(Collectors.joining(","))); + coordinatorContext.signalNoMoreSplits(subtaskId); + LOG.info("Finish assigning splits reader {}", subtaskId); + } + + private int getReaderIndex(String splitId, int totalReaderNum) { + return (splitId.hashCode() & Integer.MAX_VALUE) % totalReaderNum; + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + + } + + @Override + public EmptyState snapshotState() throws Exception { + return new EmptyState(); + } + + @Override + public void close() { + + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java new file mode 100644 index 000000000..71791a3d4 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java @@ -0,0 +1,152 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.enumerate.ContentType; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; +import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.config.HadoopConf; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static com.bytedance.bitsail.common.option.ReaderOptions.BaseReaderOptions.CONTENT_TYPE; + +public class HadoopSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceReader.class); + private static final long serialVersionUID = 1L; + private final BitSailConfiguration readerConfiguration; + private final DeserializationSchema deserializationSchema; + private final Context readerContext; + private final HadoopConf hadoopConf; + private final HashSet assignedHadoopSplits; + private final HashSet finishedHadoopSplits; + private boolean noMoreSplits; + + public HadoopSourceReader(BitSailConfiguration readerConfiguration, Context readerContext) { + this.readerConfiguration = readerConfiguration; + this.readerContext = readerContext; + this.assignedHadoopSplits = Sets.newHashSet(); + this.finishedHadoopSplits = Sets.newHashSet(); + this.noMoreSplits = false; + + String defaultFS = readerConfiguration.getNecessaryOption(HadoopReaderOptions.DEFAULT_FS, HadoopErrorCode.REQUIRED_VALUE); + this.hadoopConf = new HadoopConf(defaultFS); + ContentType contentType = ContentType.valueOf(readerConfiguration.getNecessaryOption(CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); + switch (contentType) { + case CSV: + deserializationSchema = + new CsvDeserializationSchema(readerConfiguration, readerContext.getTypeInfos(), readerContext.getFieldNames()); + break; + case JSON: + deserializationSchema = + new JsonDeserializationSchema(readerConfiguration, readerContext.getTypeInfos(), readerContext.getFieldNames()); + ; + break; + default: + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); + } + } + + @Override + public void start() { + + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + for (HadoopSourceSplit sourceSplit : assignedHadoopSplits) { + String hadoopPath = sourceSplit.uniqSplitId(); + LOG.info("Start to process split: {}", hadoopPath); + Configuration conf = getConfiguration(hadoopConf); + FileSystem fs = FileSystem.get(conf); + Path filePath = new Path(hadoopPath); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + reader.lines().forEach(line -> { + try { + Row row = deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8)); + pipeline.output(row); + } catch (IOException e) { + throw BitSailException.asBitSailException(HadoopErrorCode.HDFS_IO, e); + } + }); + } + finishedHadoopSplits.add(sourceSplit); + } + assignedHadoopSplits.removeAll(finishedHadoopSplits); + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("Subtask {} received no more split signal.", readerContext.getIndexOfSubtask()); + noMoreSplits = true; + } + + @Override + public void addSplits(List splitList) { + assignedHadoopSplits.addAll(splitList); + } + + @Override + public boolean hasMoreElements() { + if (noMoreSplits) { + return CollectionUtils.size(assignedHadoopSplits) != 0; + } + return true; + } + + private Configuration getConfiguration(HadoopConf hadoopConf) { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); + hadoopConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java new file mode 100644 index 000000000..6dd0ffbe2 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +public class HadoopSourceSplit implements SourceSplit { + + private static final long serialVersionUID = 1L; + + private final String splitId; + + public HadoopSourceSplit(String splitId) { + this.splitId = splitId; + } + + @Override + public String uniqSplitId() { + return splitId; + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json new file mode 100644 index 000000000..5ec9bb9d9 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json @@ -0,0 +1,9 @@ +{ + "name": "bitsail-connector-unified-hadoop", + "classes": [ + "com.bytedance.bitsail.connector.hadoop.source.HadoopSource" + ], + "libs": [ + "connector-hadoop-${version}.jar" + ] +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java new file mode 100644 index 000000000..4fd1cf3da --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java @@ -0,0 +1,66 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class HadoopSourceITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class); + private static FileSystem fs; + + @Before + public void setUp() throws IOException, InterruptedException { + MiniClusterUtil.setUp(); + fs = MiniClusterUtil.fileSystem; + } + + @After + public void close() { + MiniClusterUtil.shutdown(); + fs = null; + } + + @Test + public void testHadoopToPrintJson() throws Exception { + String localJsonFile = "test_namespace/source/test.json"; + String remoteJsonFile = "/test_namespace/source/test.json"; + Configuration conf = fs.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + ClassLoader classLoader = JobConfUtils.class.getClassLoader(); + fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_json.json"); + jobConf.set(HadoopReaderOptions.DEFAULT_FS, defaultFS); + jobConf.set(HadoopReaderOptions.PATH_LIST, remoteJsonFile); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json new file mode 100644 index 000000000..e62363870 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_json.json @@ -0,0 +1,58 @@ +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.print.sink.PrintSink", + "content_type": "json", + "batch_size": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json b/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json new file mode 100644 index 000000000..1eb170ed1 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json @@ -0,0 +1 @@ +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} \ No newline at end of file diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml index b9e4d7963..9a0748cfa 100644 --- a/bitsail-connectors/pom.xml +++ b/bitsail-connectors/pom.xml @@ -46,6 +46,7 @@ connector-druid connector-assert connector-selectdb + connector-hadoop diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml index 285d542a1..4009c554f 100644 --- a/bitsail-dist/pom.xml +++ b/bitsail-dist/pom.xml @@ -272,6 +272,13 @@ ${revision} provided + + + com.bytedance.bitsail + connector-hadoop + ${revision} + provided + diff --git a/website/en/documents/connectors/hadoop/hadoop-v1-example.md b/website/en/documents/connectors/hadoop/hadoop-v1-example.md new file mode 100644 index 000000000..914aba5ac --- /dev/null +++ b/website/en/documents/connectors/hadoop/hadoop-v1-example.md @@ -0,0 +1,108 @@ +# Hadoop connector examples + +Parent document: [hadoop-connector](./hadoop.md) + +The following configuration shows how to organize parameter configuration to read the following json format hdfs file. + +- Example json data +```json +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} +``` + + +- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs +- Configuration file used to read the above hdfs file: + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +The following configuration shows how to organize parameter configuration to read the following csv format hdfs file. + +- Example csv data + +```csv +1,100001,100.001,text_0001,2020-01-01 +2,100002,100.002,text_0002,2020-01-02 +``` + + +- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs +- Configuration file used to read the above hdfs file: + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.csv", + "content_type":"csv", + "reader_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "long" + }, + { + "name": "int_type", + "type": "int" + }, + { + "name": "double_type", + "type": "double" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + } + ] + } + } +} +``` + diff --git a/website/en/documents/connectors/hadoop/hadoop-v1.md b/website/en/documents/connectors/hadoop/hadoop-v1.md new file mode 100644 index 000000000..13bd343c3 --- /dev/null +++ b/website/en/documents/connectors/hadoop/hadoop-v1.md @@ -0,0 +1,129 @@ +# Hadoop connector + +Parent document: [connectors](../introduction.md) + + +## Main function + +Hadoop connector can be used to read hdfs files in batch scenarios. Its function points mainly include: + + - Support reading files in multiple hdfs directories at the same time + - Support reading hdfs files of various formats + +## Maven dependency + +```text + + com.bytedance.bitsail + connector-hadoop + ${revision} + +``` + +## Supported data types + - Basic data types supported by Hadoop connectors: + - Integer type: + - short + - int + - long + - biginterger + - Float type: + - float + - double + - bigdecimal + - Time type: + - timestamp + - date + - time + - String type: + - string + - Bool type: + - boolean + - Binary type: + - binary +- Composited data types supported by Hadoop connectors: + - map + - list + +## Parameters + +The following mentioned parameters should be added to `job.reader` block when using, for example: + +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +### Necessary parameters + +| Param name | Required | Optional value | Description | +| :----------- | :------- | :------------- | :----------------------------------------------------------- | +| class | Yes | | Class name of hadoop connector, v1 connector is `com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | +| defaultFS | Yes | | NameNode URI | +| path_list | Yes | | Specifies the path of the read in file. Multiple paths can be specified, separated by `','` | +| content_type | Yes | JSON
CSV | Specify the format of the read in file. | +| columns | Yes | | Describing fields' names and types | + +### Optional parameters +| Param name | Required | Optional value | Description | +| :----------------------------------- | :------- | :------------- | :-------------------------------------------------------- | +| default_hadoop_parallelism_threshold | No | | The number of files read by each reader, the default is 2 | +| reader_parallelism_num | No | | Reader parallelism | + + +## Supported format + +Support the following formats: + +- [JSON](#jump_json) +- [CSV](#jump_csv) + +### JSON +It supports parsing text files in json format. Each line is required to be a standard json string. + +### CSV +Support parsing of text files in csv format. Each line is required to be a standard csv string. + +The following parameters are supported to adjust the csv parsing style: + + +| Parameter name | Default value | Description | +|-----------------------------------|---------------|----------------------------------------------------------------------------| +| `job.common.csv_delimiter` | `','` | csv delimiter | +| `job.common.csv_escape` | | escape character | +| `job.common.csv_quote` | | quote character | +| `job.common.csv_with_null_string` | | Specify the conversion value of null field. It is not converted by default | + +---- + + +## Related document + +Configuration examples: [hadoop-v1-connector-example](./hadoop-v1-example.md) diff --git a/website/zh/documents/connectors/hadoop/hadoop-v1-example.md b/website/zh/documents/connectors/hadoop/hadoop-v1-example.md new file mode 100644 index 000000000..625622868 --- /dev/null +++ b/website/zh/documents/connectors/hadoop/hadoop-v1-example.md @@ -0,0 +1,108 @@ +# Hadoop连接器使用示例 + +上级文档: [hadoop连接器](./hadoop_zh.md) + +下面展示了如何使用用户参数配置读取如下json格式hdfs文件。 + +- 示例json数据 +```json +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} +``` + + +- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下 +- 用于读取上述格式hdfs文件的配置 + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +下面展示了如何使用用户参数配置读取如下csv格式hdfs文件。 + +- 示例csv数据 + +```csv +1,100001,100.001,text_0001,2020-01-01 +2,100002,100.002,text_0002,2020-01-02 +``` + + +- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下 +- 用于读取上述格式hdfs文件的配置 + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.csv", + "content_type":"csv", + "reader_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "long" + }, + { + "name": "int_type", + "type": "int" + }, + { + "name": "double_type", + "type": "double" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + } + ] + } + } +} +``` + diff --git a/website/zh/documents/connectors/hadoop/hadoop-v1.md b/website/zh/documents/connectors/hadoop/hadoop-v1.md new file mode 100644 index 000000000..ff4a0458f --- /dev/null +++ b/website/zh/documents/connectors/hadoop/hadoop-v1.md @@ -0,0 +1,128 @@ +# Hadoop连接器 + +上级文档: [connectors](../introduction_zh.md) + + +## 主要功能 + +Hadoop连接器可用于批式场景下的hdfs文件读取。其功能点主要包括: + + - 支持同时读取多个hdfs目录下的文件 + - 支持读取多种格式的hdfs文件 + +## 依赖引入 + +```text + + com.bytedance.bitsail + connector-hadoop + ${revision} + +``` + +## 支持的数据类型 + - 支持的基础数据类型如下: + - 整数类型: + - short + - int + - long + - bitinteger + - 浮点类型: + - float + - double + - bigdecimal + - 时间类型: + - timestamp + - date + - time + - 字符类型: + - string + - 布尔类型: + - boolean + - 二进制类型: + - binary + - 支持的复杂数据类型包括: + - map + - list + +## 主要参数 + +以下参数使用在`job.reader`配置中,实际使用时请注意路径前缀。示例: +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "defaultFS": "hdfs://127.0.0.1:9000/", + "path_list": "/test_namespace/source/test.json", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +### 必需参数 + +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +| :----------- | :----------- | :---------- | :----------------------------------------------------------- | +| class | 是 | | Hadoop读连接器类名,v1 connector为为`com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | +| defaultFS | 是 | | NameNode URI | +| path_list | 是 | | 指定读入文件的路径。可指定多个路径,使用`','`分隔 | +| content_type | 是 | JSON
CSV | 指定读入文件的格式 | +| columns | 是 | | 数据字段名称及类型 | + +### 可选参数 +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +| :----------------------------------- | :----------- | :--------- | :-------------------------------- | +| default_hadoop_parallelism_threshold | 否 | | 每个reader读取的文件数目,默认为2 | +| reader_parallelism_num | 否 | | 读并发数,无默认值 | + + +## 支持的文件格式 + +支持对以下格式的文件进行解读: + +- [JSON](#jump_json) +- [CSV](#jump_csv) + +### JSON +支持对json格式的文本文件进行解析,要求每行均为标准的json字符串。 + +### CSV +支持对csv格式的文本文件进行解析,要求每行均为标准的csv字符串。 +支持以下参数对csv解析方式进行调整: + + +| 参数名称 | 参数默认值 | 参数说明 | +|-----------------------------------|-------|--------------------| +| `job.common.csv_delimiter` | `','` | csv分隔符 | +| `job.common.csv_escape` | | escape字符 | +| `job.common.csv_quote` | | quote字符 | +| `job.common.csv_with_null_string` | | 指定null字段的转化值,默认不转化 | + +---- + + +## 相关文档 + +配置示例文档 [hadoop v1连接器示例](./hadoop-v1-example.md) +