diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml
index 3a89a1e11..a7752388a 100644
--- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml
+++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml
@@ -26,9 +26,6 @@
bitsail-connector-hadoop
-
-
-
com.bytedance.bitsail
diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml
new file mode 100644
index 000000000..37182aafe
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/pom.xml
@@ -0,0 +1,104 @@
+
+
+
+
+
+ bitsail-connectors
+ com.bytedance.bitsail
+ ${revision}
+
+ 4.0.0
+
+ connector-hadoop
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+
+ com.bytedance.bitsail
+ bitsail-shaded-hadoop
+ ${revision}
+ provided
+
+
+
+ com.bytedance.bitsail
+ bitsail-shaded-hive
+ ${revision}
+ provided
+
+
+ netty-common
+ io.netty
+
+
+ netty-buffer
+ io.netty
+
+
+
+
+
+ com.bytedance.bitsail
+ bitsail-component-format-csv
+ ${revision}
+ compile
+
+
+
+ com.bytedance.bitsail
+ bitsail-component-format-json
+ ${revision}
+ compile
+
+
+
+
+ com.bytedance.bitsail
+ bitsail-connector-print
+ ${revision}
+ test
+
+
+
+ com.bytedance.bitsail
+ connector-print
+ ${revision}
+ test
+
+
+
+ com.bytedance.bitsail
+ bitsail-connector-test
+ ${revision}
+ test
+
+
+ hppc
+ com.carrotsearch
+
+
+
+
+
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java
new file mode 100644
index 000000000..60075afd3
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.constant;
+
+public class HadoopConstants {
+ public static String HADOOP_CONNECTOR_NAME = "hadoop";
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java
new file mode 100644
index 000000000..ad42bc5df
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.error;
+
+import com.bytedance.bitsail.common.exception.ErrorCode;
+
+public enum HadoopErrorCode implements ErrorCode {
+
+ REQUIRED_VALUE("Hadoop-01", "You missed parameter which is required, please check your configuration."),
+ UNSUPPORTED_ENCODING("Hadoop-02", "Unsupported Encoding."),
+ UNSUPPORTED_COLUMN_TYPE("Hadoop-03", "Unsupported column type."),
+ HDFS_IO("Hadoop-04", "IO Exception.");
+
+ private final String code;
+ private final String description;
+
+ HadoopErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s].", this.code,
+ this.description);
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java
new file mode 100644
index 000000000..7327dc176
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.format;
+
+import com.bytedance.bitsail.base.format.DeserializationSchema;
+import com.bytedance.bitsail.common.BitSailException;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.model.ColumnInfo;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.common.typeinfo.TypeInfo;
+import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
+import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
+
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class HiveInputFormatDeserializationSchema implements DeserializationSchema {
+ private final BitSailConfiguration deserializationConfiguration;
+ private final TypeInfo>[] typeInfos;
+ private final String[] fieldNames;
+ private final StructObjectInspector inspector;
+ public HiveInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
+ TypeInfo>[] typeInfos,
+ String[] fieldNames) {
+
+ this.deserializationConfiguration = deserializationConfiguration;
+ this.typeInfos = typeInfos;
+ this.fieldNames = fieldNames;
+
+ List columnInfos = deserializationConfiguration.get(HadoopReaderOptions.COLUMNS);
+ Properties p = new Properties();
+ String columns = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.joining(","));
+ String columnsTypes = columnInfos.stream().map(ColumnInfo::getType).collect(Collectors.joining(":"));
+ p.setProperty("columns", columns);
+ p.setProperty("columns.types", columnsTypes);
+ String inputFormatClass = deserializationConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS);
+ try {
+ switch (inputFormatClass) {
+ case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat": {
+ OrcSerde serde = new OrcSerde();
+ serde.initialize(new JobConf(), p);
+ this.inspector = (StructObjectInspector) serde.getObjectInspector();
+ break;
+ }
+ case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat": {
+ ParquetHiveSerDe serde = new ParquetHiveSerDe();
+ serde.initialize(new JobConf(), p);
+ this.inspector = (StructObjectInspector) serde.getObjectInspector();
+ break;
+ }
+ default:
+ throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported input format class: " + inputFormatClass);
+ }
+ } catch (SerDeException e) {
+ throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_COLUMN_TYPE, "unsupported column information.");
+ }
+ }
+
+ @Override
+ public Row deserialize(Writable message) {
+ int arity = fieldNames.length;
+ List extends StructField> fields = inspector.getAllStructFieldRefs();
+ Row row = new Row(arity);
+ for (int i = 0; i < arity; ++i) {
+ Object writableData = inspector.getStructFieldData(message, fields.get(i));
+ row.setField(i, getWritableValue(writableData));
+ }
+ return row;
+ }
+
+ @Override
+ public boolean isEndOfStream(Row nextElement) {
+ return false;
+ }
+
+ private Object getWritableValue(Object writable) {
+ Object ret;
+
+ if (writable == null) {
+ ret = null;
+ } else if (writable instanceof IntWritable) {
+ ret = ((IntWritable) writable).get();
+ } else if (writable instanceof Text) {
+ ret = writable.toString();
+ } else if (writable instanceof LongWritable) {
+ ret = ((LongWritable) writable).get();
+ } else if (writable instanceof ByteWritable) {
+ ret = ((ByteWritable) writable).get();
+ } else if (writable instanceof DateWritable) {
+ ret = ((DateWritable) writable).get();
+ } else if (writable instanceof DoubleWritable) {
+ ret = ((DoubleWritable) writable).get();
+ } else if (writable instanceof TimestampWritable) {
+ ret = ((TimestampWritable) writable).getTimestamp();
+ } else if (writable instanceof FloatWritable) {
+ ret = ((FloatWritable) writable).get();
+ } else if (writable instanceof BooleanWritable) {
+ ret = ((BooleanWritable) writable).get();
+ } else if (writable instanceof BytesWritable) {
+ BytesWritable bytesWritable = (BytesWritable) writable;
+ byte[] bytes = bytesWritable.getBytes();
+ ret = new byte[bytesWritable.getLength()];
+ System.arraycopy(bytes, 0, ret, 0, bytesWritable.getLength());
+ } else if (writable instanceof HiveDecimalWritable) {
+ ret = ((HiveDecimalWritable) writable).getHiveDecimal().bigDecimalValue();
+ } else if (writable instanceof ShortWritable) {
+ ret = ((ShortWritable) writable).get();
+ } else {
+ ret = writable.toString();
+ }
+ return ret;
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java
new file mode 100644
index 000000000..27a7f7cbf
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.format;
+
+import com.bytedance.bitsail.base.enumerate.ContentType;
+import com.bytedance.bitsail.base.format.DeserializationSchema;
+import com.bytedance.bitsail.common.BitSailException;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.common.typeinfo.TypeInfo;
+import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema;
+import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema;
+import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
+import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
+
+import org.apache.hadoop.io.Writable;
+
+public class TextInputFormatDeserializationSchema implements DeserializationSchema {
+ private final BitSailConfiguration deserializationConfiguration;
+ private final TypeInfo>[] typeInfos;
+ private final String[] fieldNames;
+ private final DeserializationSchema deserializationSchema;
+
+ public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
+ TypeInfo>[] typeInfos,
+ String[] fieldNames) {
+ this.deserializationConfiguration = deserializationConfiguration;
+ this.typeInfos = typeInfos;
+ this.fieldNames = fieldNames;
+
+ ContentType contentType = ContentType.valueOf(
+ deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());
+ switch (contentType) {
+ case CSV:
+ this.deserializationSchema =
+ new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
+ break;
+ case JSON:
+ this.deserializationSchema =
+ new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
+ break;
+ default:
+ throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType);
+ }
+ }
+
+ @Override
+ public Row deserialize(Writable message) {
+ return deserializationSchema.deserialize((message.toString()).getBytes());
+ }
+
+ @Override
+ public boolean isEndOfStream(Row nextElement) {
+ return false;
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java
new file mode 100644
index 000000000..74773437c
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.option;
+
+import com.bytedance.bitsail.common.annotation.Essential;
+import com.bytedance.bitsail.common.option.ConfigOption;
+import com.bytedance.bitsail.common.option.ReaderOptions;
+
+import static com.bytedance.bitsail.common.option.ConfigOptions.key;
+import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX;
+
+public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions {
+ @Essential
+ ConfigOption PATH_LIST =
+ key(READER_PREFIX + "path_list")
+ .noDefaultValue(String.class);
+
+ ConfigOption CONTENT_TYPE =
+ key(READER_PREFIX + "content_type")
+ .noDefaultValue(String.class);
+
+ ConfigOption READER_PARALLELISM_NUM =
+ key(READER_PREFIX + "reader_parallelism_num")
+ .noDefaultValue(Integer.class);
+
+ ConfigOption DEFAULT_HADOOP_PARALLELISM_THRESHOLD =
+ key(READER_PREFIX + "default_hadoop_parallelism_threshold")
+ .defaultValue(2);
+
+ ConfigOption HADOOP_INPUT_FORMAT_CLASS =
+ key(READER_PREFIX + "hadoop_inputformat_class")
+ .defaultValue("org.apache.hadoop.mapred.TextInputFormat");
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java
new file mode 100644
index 000000000..19d5d6119
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.source;
+
+import com.bytedance.bitsail.base.connector.reader.v1.Boundedness;
+import com.bytedance.bitsail.base.connector.reader.v1.Source;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
+import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState;
+import com.bytedance.bitsail.base.execution.ExecutionEnviron;
+import com.bytedance.bitsail.base.extension.ParallelismComputable;
+import com.bytedance.bitsail.base.parallelism.ParallelismAdvice;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter;
+import com.bytedance.bitsail.common.type.TypeInfoConverter;
+import com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants;
+import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
+import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
+import com.bytedance.bitsail.connector.hadoop.source.coordinator.HadoopSourceSplitCoordinator;
+import com.bytedance.bitsail.connector.hadoop.source.reader.HadoopSourceReader;
+import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class HadoopSource implements Source, ParallelismComputable {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class);
+
+ private BitSailConfiguration readerConfiguration;
+ private List hadoopPathList;
+
+ @Override
+ public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException {
+ this.readerConfiguration = readerConfiguration;
+ hadoopPathList = Arrays.asList(readerConfiguration.getNecessaryOption(HadoopReaderOptions.PATH_LIST, HadoopErrorCode.REQUIRED_VALUE).split(","));
+ }
+
+ @Override
+ public Boundedness getSourceBoundedness() {
+ return Boundedness.BOUNDEDNESS;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReader.Context readerContext) {
+ return new HadoopSourceReader(readerConfiguration, readerContext, hadoopPathList);
+ }
+
+ @Override
+ public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) {
+ return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList);
+ }
+
+ @Override
+ public String getReaderName() {
+ return HadoopConstants.HADOOP_CONNECTOR_NAME;
+ }
+
+ @Override
+ public TypeInfoConverter createTypeInfoConverter() {
+ return new BitSailTypeInfoConverter();
+ }
+
+ @Override
+ public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception {
+ int adviceParallelism;
+ if (selfConf.fieldExists(HadoopReaderOptions.READER_PARALLELISM_NUM)) {
+ adviceParallelism = selfConf.get(HadoopReaderOptions.READER_PARALLELISM_NUM);
+ } else {
+ int parallelismThreshold = selfConf.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD);
+ adviceParallelism = Math.max(hadoopPathList.size() / parallelismThreshold, 1);
+ }
+ LOG.info("Parallelism for this job will set to {}.", adviceParallelism);
+ return ParallelismAdvice.builder()
+ .adviceParallelism(adviceParallelism)
+ .enforceDownStreamChain(true)
+ .build();
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java
new file mode 100644
index 000000000..a5eeeb46b
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.source.coordinator;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
+import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
+import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class HadoopSourceSplitCoordinator implements SourceSplitCoordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceSplitCoordinator.class);
+ private final Context coordinatorContext;
+ private final BitSailConfiguration readerConfiguration;
+ private final HashSet assignedHadoopSplits;
+ private HashSet pendingHadoopSplits;
+ private final JobConf jobConf;
+ private final InputFormat mapredInputFormat;
+
+ public HadoopSourceSplitCoordinator(BitSailConfiguration readerConfiguration,
+ Context coordinatorContext, List hadoopPathList) {
+ this.coordinatorContext = coordinatorContext;
+ this.readerConfiguration = readerConfiguration;
+ this.jobConf = new JobConf();
+ for (String path : hadoopPathList) {
+ FileInputFormat.addInputPath(this.jobConf, new Path(path));
+ }
+ String inputClassName = readerConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS);
+ Class> inputClass;
+ try {
+ inputClass = Class.forName(inputClassName);
+ this.mapredInputFormat = (InputFormat) inputClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ ReflectionUtils.setConf(mapredInputFormat, jobConf);
+ this.assignedHadoopSplits = Sets.newHashSet();
+ }
+
+ @Override
+ public void start() {
+ this.pendingHadoopSplits = Sets.newHashSet();
+ int parallelismThreshold = readerConfiguration.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD);
+ int readerNum = coordinatorContext.totalParallelism();
+ int splitNum = readerNum * parallelismThreshold;
+ InputSplit[] splits;
+ try {
+ splits = mapredInputFormat.getSplits(jobConf, splitNum);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Arrays.stream(splits).forEach(split -> pendingHadoopSplits.add(new HadoopSourceSplit(split)));
+ LOG.info("Found {} readers and {} splits.", readerNum, pendingHadoopSplits.size());
+ if (readerNum > pendingHadoopSplits.size()) {
+ LOG.error("Reader number {} is larger than split number {}.", readerNum, pendingHadoopSplits.size());
+ }
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ LOG.info("Found reader {}", subtaskId);
+ assignSplit(subtaskId);
+ }
+
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ LOG.info("Source reader {} return splits {}.", subtaskId, splits);
+ pendingHadoopSplits.addAll(splits);
+ assignSplit(subtaskId);
+ }
+ }
+
+ private void assignSplit(int subtaskId) {
+ ArrayList currentTaskSplits = new ArrayList<>();
+ int readerNum = coordinatorContext.totalParallelism();
+ if (readerNum == 1) {
+ // if parallelism == 1, we should assign all the splits to reader
+ currentTaskSplits.addAll(pendingHadoopSplits);
+ } else {
+ // if parallelism > 1, according to hashCode of splitId to determine which reader to allocate the current task
+ for (HadoopSourceSplit hadoopSourceSplit : pendingHadoopSplits) {
+ int readerIndex = getReaderIndex(hadoopSourceSplit.uniqSplitId(), readerNum);
+ if (readerIndex == subtaskId) {
+ currentTaskSplits.add(hadoopSourceSplit);
+ }
+ }
+ }
+ // assign splits
+ coordinatorContext.assignSplit(subtaskId, currentTaskSplits);
+ // save the state of assigned splits
+ assignedHadoopSplits.addAll(currentTaskSplits);
+ // remove the assigned splits from pending splits
+ currentTaskSplits.forEach(split -> pendingHadoopSplits.remove(split));
+ LOG.info("SubTask {} is assigned to [{}]", subtaskId, currentTaskSplits.stream().map(HadoopSourceSplit::uniqSplitId).collect(Collectors.joining(",")));
+ coordinatorContext.signalNoMoreSplits(subtaskId);
+ LOG.info("Finish assigning splits reader {}", subtaskId);
+ }
+
+ private int getReaderIndex(String splitId, int totalReaderNum) {
+ return (splitId.hashCode() & Integer.MAX_VALUE) % totalReaderNum;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+
+ }
+
+ @Override
+ public EmptyState snapshotState() throws Exception {
+ return new EmptyState();
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java
new file mode 100644
index 000000000..37cdc018a
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.source.reader;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
+import com.bytedance.bitsail.base.format.DeserializationSchema;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.connector.hadoop.format.HiveInputFormatDeserializationSchema;
+import com.bytedance.bitsail.connector.hadoop.format.TextInputFormatDeserializationSchema;
+import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
+import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+public class HadoopSourceReader implements SourceReader {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceReader.class);
+ private static final long serialVersionUID = 1L;
+ private final BitSailConfiguration readerConfiguration;
+ private final Context readerContext;
+ private final HashSet assignedHadoopSplits;
+ private final HashSet finishedHadoopSplits;
+ private boolean noMoreSplits;
+ protected JobConf jobConf;
+ private InputFormat mapredInputFormat;
+ private K key;
+ private V value;
+ private DeserializationSchema deserializationSchema;
+ private RecordReader recordReader;
+
+ public HadoopSourceReader(BitSailConfiguration readerConfiguration, Context readerContext, List hadoopPathList) {
+ this.readerConfiguration = readerConfiguration;
+ this.readerContext = readerContext;
+ this.assignedHadoopSplits = Sets.newHashSet();
+ this.finishedHadoopSplits = Sets.newHashSet();
+ this.jobConf = new JobConf();
+ for (String path : hadoopPathList) {
+ FileInputFormat.addInputPath(this.jobConf, new Path(path));
+ }
+
+ String inputClassName = readerConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS);
+ Class> inputClass;
+ try {
+ inputClass = Class.forName(inputClassName);
+ this.mapredInputFormat = (InputFormat) inputClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ ReflectionUtils.setConf(mapredInputFormat, jobConf);
+ this.noMoreSplits = false;
+
+ if (this.mapredInputFormat instanceof TextInputFormat) {
+ deserializationSchema = new TextInputFormatDeserializationSchema(
+ readerConfiguration,
+ readerContext.getTypeInfos(),
+ readerContext.getFieldNames());
+ } else if (this.mapredInputFormat instanceof MapredParquetInputFormat
+ || this.mapredInputFormat instanceof OrcInputFormat) {
+ deserializationSchema = new HiveInputFormatDeserializationSchema(
+ readerConfiguration,
+ readerContext.getTypeInfos(),
+ readerContext.getFieldNames());
+ }
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void pollNext(SourcePipeline pipeline) throws Exception {
+ for (HadoopSourceSplit sourceSplit : assignedHadoopSplits) {
+
+ sourceSplit.initInputSplit(jobConf);
+ LOG.info("Start to process split: {}", sourceSplit.uniqSplitId());
+ this.recordReader = this.mapredInputFormat.getRecordReader(sourceSplit.getHadoopInputSplit(), jobConf, Reporter.NULL);
+ if (this.recordReader instanceof Configurable) {
+ ((Configurable) this.recordReader).setConf(jobConf);
+ }
+ key = this.recordReader.createKey();
+ value = this.recordReader.createValue();
+ while (this.recordReader.next(key, value)) {
+ Row row = deserializationSchema.deserialize((Writable) value);
+ pipeline.output(row);
+ }
+ finishedHadoopSplits.add(sourceSplit);
+ }
+ assignedHadoopSplits.removeAll(finishedHadoopSplits);
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ LOG.info("Subtask {} received no more split signal.", readerContext.getIndexOfSubtask());
+ noMoreSplits = true;
+ }
+
+ @Override
+ public void addSplits(List splitList) {
+ assignedHadoopSplits.addAll(splitList);
+ }
+
+ @Override
+ public boolean hasMoreElements() {
+ if (noMoreSplits) {
+ return CollectionUtils.size(assignedHadoopSplits) != 0;
+ }
+ return true;
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() throws Exception {
+ recordReader.close();
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java
new file mode 100644
index 000000000..5b25b993e
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.source.split;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class HadoopSourceSplit implements SourceSplit {
+ private static final long serialVersionUID = 1L;
+ private final Class extends InputSplit> splitType;
+ private transient InputSplit hadoopInputSplit;
+ private byte[] hadoopInputSplitByteArray;
+
+ public HadoopSourceSplit(InputSplit inputSplit) {
+ if (inputSplit == null) {
+ throw new NullPointerException("Hadoop input split must not be null");
+ }
+
+ this.splitType = inputSplit.getClass();
+ this.hadoopInputSplit = inputSplit;
+ }
+
+ public InputSplit getHadoopInputSplit() {
+ return this.hadoopInputSplit;
+ }
+
+ public void initInputSplit(JobConf jobConf) {
+ if (this.hadoopInputSplit != null) {
+ return;
+ }
+
+ checkNotNull(hadoopInputSplitByteArray);
+
+ try {
+ this.hadoopInputSplit = (InputSplit) WritableFactories.newInstance(splitType);
+
+ if (this.hadoopInputSplit instanceof Configurable) {
+ ((Configurable) this.hadoopInputSplit).setConf(jobConf);
+ } else if (this.hadoopInputSplit instanceof JobConfigurable) {
+ ((JobConfigurable) this.hadoopInputSplit).configure(jobConf);
+ }
+
+ if (hadoopInputSplitByteArray != null) {
+ try (ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(hadoopInputSplitByteArray))) {
+ this.hadoopInputSplit.readFields(objectInputStream);
+ }
+
+ this.hadoopInputSplitByteArray = null;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
+ }
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+
+ if (hadoopInputSplit != null) {
+ try (
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
+ ) {
+ this.hadoopInputSplit.write(objectOutputStream);
+ objectOutputStream.flush();
+ this.hadoopInputSplitByteArray = byteArrayOutputStream.toByteArray();
+ }
+ }
+ out.defaultWriteObject();
+ }
+
+ @Override
+ public String uniqSplitId() {
+ return hadoopInputSplit.toString();
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json
new file mode 100644
index 000000000..5ec9bb9d9
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json
@@ -0,0 +1,9 @@
+{
+ "name": "bitsail-connector-unified-hadoop",
+ "classes": [
+ "com.bytedance.bitsail.connector.hadoop.source.HadoopSource"
+ ],
+ "libs": [
+ "connector-hadoop-${version}.jar"
+ ]
+}
\ No newline at end of file
diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java
new file mode 100644
index 000000000..3e0c4c0bf
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2022 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.hadoop.source;
+
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
+import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster;
+import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class HadoopSourceITCase {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class);
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(new File("/tmp"));
+
+ private static FileSystem FILESYSTEM;
+ File folder;
+
+ @Before
+ public void setUp() throws IOException {
+ FILESYSTEM = LocalFileSystem.getLocal(new Configuration());
+ folder = TEMP_FOLDER.newFolder();
+ }
+
+ @After
+ public void close() {
+ FILESYSTEM = null;
+ folder = null;
+ }
+
+ @Test
+ public void testHadoopToPrintJson() throws Exception {
+ Path source = Paths.get(HadoopSourceITCase.class.getClassLoader()
+ .getResource("source/test.json")
+ .toURI()
+ .getPath());
+
+ Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString());
+ Files.copy(source, target);
+ Configuration conf = FILESYSTEM.getConf();
+ String defaultFS = conf.get("fs.defaultFS");
+ LOG.info("fs.defaultFS: {}", defaultFS);
+ BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json");
+ jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target);
+ EmbeddedFlinkCluster.submitJob(jobConf);
+ }
+
+ @Test
+ public void testHadoopToPrintParquet() throws Exception {
+ Path source = Paths.get(HadoopSourceITCase.class.getClassLoader()
+ .getResource("source/test_parquet")
+ .toURI()
+ .getPath());
+
+ Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString());
+ Files.copy(source, target);
+ Configuration conf = FILESYSTEM.getConf();
+ String defaultFS = conf.get("fs.defaultFS");
+ LOG.info("fs.defaultFS: {}", defaultFS);
+ String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+ BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json");
+ jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat);
+ jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target);
+ EmbeddedFlinkCluster.submitJob(jobConf);
+ }
+
+ @Test
+ public void testHadoopToPrintOrc() throws Exception {
+ Path source = Paths.get(HadoopSourceITCase.class.getClassLoader()
+ .getResource("source/test_orc")
+ .toURI()
+ .getPath());
+
+ Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString());
+ Files.copy(source, target);
+ Configuration conf = FILESYSTEM.getConf();
+ String defaultFS = conf.get("fs.defaultFS");
+ LOG.info("fs.defaultFS: {}", defaultFS);
+ String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+ BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json");
+ jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat);
+ jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target);
+ EmbeddedFlinkCluster.submitJob(jobConf);
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json
new file mode 100644
index 000000000..9f013c6f9
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json
@@ -0,0 +1,56 @@
+{
+ "job": {
+ "common": {
+ "job_id": 313,
+ "instance_id": 3123,
+ "job_name": "bitsail_hadoop_to_print_test",
+ "user_name": "root"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"json",
+ "reader_parallelism_num": 1,
+ "columns":[
+ {
+ "name":"id_card",
+ "type":"int"
+ },
+ {
+ "name":"tran_time",
+ "type":"string"
+ },
+ {
+ "name":"name",
+ "type":"string"
+ },
+ {
+ "name":"cash",
+ "type":"int"
+ }
+ ]
+ },
+ "writer": {
+ "class": "com.bytedance.bitsail.connector.print.sink.PrintSink",
+ "content_type": "json",
+ "batch_size": 1,
+ "columns":[
+ {
+ "name":"id_card",
+ "type":"int"
+ },
+ {
+ "name":"tran_time",
+ "type":"string"
+ },
+ {
+ "name":"name",
+ "type":"string"
+ },
+ {
+ "name":"cash",
+ "type":"int"
+ }
+ ]
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json
new file mode 100644
index 000000000..8c0c7aa7a
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json
@@ -0,0 +1,56 @@
+{
+ "job": {
+ "common": {
+ "job_id": 313,
+ "instance_id": 3123,
+ "job_name": "bitsail_hadoop_to_print_test",
+ "user_name": "root"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"json",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name":"id",
+ "type": "int"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "map_string_string",
+ "type": "map"
+ },
+ {
+ "name": "array_string",
+ "type": "list"
+ }
+ ]
+ },
+ "writer": {
+ "class": "com.bytedance.bitsail.connector.print.sink.PrintSink",
+ "content_type": "json",
+ "batch_size": 1,
+ "columns": [
+ {
+ "name":"id",
+ "type": "int"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "map_string_string",
+ "type": "map"
+ },
+ {
+ "name": "array_string",
+ "type": "list"
+ }
+ ]
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json b/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json
new file mode 100644
index 000000000..1eb170ed1
--- /dev/null
+++ b/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json
@@ -0,0 +1 @@
+{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]}
\ No newline at end of file
diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc
new file mode 100644
index 000000000..4017688d4
Binary files /dev/null and b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc differ
diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet
new file mode 100644
index 000000000..7a1c114a6
Binary files /dev/null and b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet differ
diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml
index b9e4d7963..9a0748cfa 100644
--- a/bitsail-connectors/pom.xml
+++ b/bitsail-connectors/pom.xml
@@ -46,6 +46,7 @@
connector-druid
connector-assert
connector-selectdb
+ connector-hadoop
diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml
index 285d542a1..4009c554f 100644
--- a/bitsail-dist/pom.xml
+++ b/bitsail-dist/pom.xml
@@ -272,6 +272,13 @@
${revision}
provided
+
+
+ com.bytedance.bitsail
+ connector-hadoop
+ ${revision}
+ provided
+
diff --git a/website/en/documents/connectors/README.md b/website/en/documents/connectors/README.md
index 3e2967a2e..1ab26ce3b 100644
--- a/website/en/documents/connectors/README.md
+++ b/website/en/documents/connectors/README.md
@@ -16,6 +16,7 @@ dir:
- [FTP/SFTP connector](ftp/ftp.md)
- [FTP/SFTP-v1 connector](ftp/v1/ftp-v1.md)
- [Hadoop connector](hadoop/hadoop.md)
+- [Hadoop-v1 connector](hadoop/v1/hadoop-v1.md)
- [HBase connector](hbase/hbase.md)
- [Hive connector](hive/hive.md)
- [Hudi connector](hudi/hudi.md)
diff --git a/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md b/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md
new file mode 100644
index 000000000..c43805e08
--- /dev/null
+++ b/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md
@@ -0,0 +1,104 @@
+# Hadoop connector examples
+
+Parent document: [hadoop-connector](./hadoop-v1.md)
+
+The following configuration shows how to organize parameter configuration to read the following json format hdfs file.
+
+- Example json data
+```json
+{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]}
+```
+
+
+- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs
+- Configuration file used to read the above hdfs file:
+
+```json
+{
+ "job": {
+ "common": {
+ "job_id": 313,
+ "instance_id": 3123,
+ "job_name": "bitsail_hadoop_to_print_test",
+ "user_name": "root"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"json",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name":"id",
+ "type": "int"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "map_string_string",
+ "type": "map"
+ },
+ {
+ "name": "array_string",
+ "type": "list"
+ }
+ ]
+ }
+ }
+}
+```
+
+The following configuration shows how to organize parameter configuration to read the following csv format hdfs file.
+
+- Example csv data
+
+```csv
+1,100001,100.001,text_0001,2020-01-01
+2,100002,100.002,text_0002,2020-01-02
+```
+
+
+- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs
+- Configuration file used to read the above hdfs file:
+
+```json
+{
+ "job": {
+ "common": {
+ "job_id": 313,
+ "instance_id": 3123,
+ "job_name": "bitsail_hadoop_to_print_test",
+ "user_name": "root"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"csv",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name": "id",
+ "type": "long"
+ },
+ {
+ "name": "int_type",
+ "type": "int"
+ },
+ {
+ "name": "double_type",
+ "type": "double"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "p_date",
+ "type": "date"
+ }
+ ]
+ }
+ }
+}
+```
+
diff --git a/website/en/documents/connectors/hadoop/v1/hadoop-v1.md b/website/en/documents/connectors/hadoop/v1/hadoop-v1.md
new file mode 100644
index 000000000..4a1d4a375
--- /dev/null
+++ b/website/en/documents/connectors/hadoop/v1/hadoop-v1.md
@@ -0,0 +1,126 @@
+# Hadoop connector
+
+Parent document: [connectors](../../README.md)
+
+
+## Main function
+
+Hadoop connector can be used to read hdfs files in batch scenarios. Its function points mainly include:
+
+ - Support reading files in multiple hdfs directories at the same time
+ - Support reading hdfs files of various formats
+
+## Maven dependency
+
+```text
+
+ com.bytedance.bitsail
+ connector-hadoop
+ ${revision}
+
+```
+
+## Supported data types
+ - Basic data types supported by Hadoop connectors:
+ - Integer type:
+ - short
+ - int
+ - long
+ - biginterger
+ - Float type:
+ - float
+ - double
+ - bigdecimal
+ - Time type:
+ - timestamp
+ - date
+ - time
+ - String type:
+ - string
+ - Bool type:
+ - boolean
+ - Binary type:
+ - binary
+- Composited data types supported by Hadoop connectors:
+ - map
+ - list
+
+## Parameters
+
+The following mentioned parameters should be added to `job.reader` block when using, for example:
+
+```json
+{
+ "job": {
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"json",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name":"id",
+ "type": "int"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "map_string_string",
+ "type": "map"
+ },
+ {
+ "name": "array_string",
+ "type": "list"
+ }
+ ]
+ },
+ }
+}
+```
+
+### Necessary parameters
+
+| Param name | Required | Optional value | Description |
+| :----------- | :------- | :------------- | :----------------------------------------------------------- |
+| class | Yes | | Class name of hadoop connector, v1 connector is `com.bytedance.bitsail.connector.hadoop.source.HadoopSource` |
+| path_list | Yes | | Specifies the path of the read in file. Multiple paths can be specified, separated by `','` |
+| content_type | Yes | JSON
CSV | Specify the format of the read in file. |
+| columns | Yes | | Describing fields' names and types |
+
+### Optional parameters
+| Param name | Required | Optional value | Description |
+| :----------------------------------- | :------- | :------------- | :--------------------------------------------------------- |
+| default_hadoop_parallelism_threshold | No | | The number of splits read by each reader, the default is 2 |
+| reader_parallelism_num | No | | Reader parallelism |
+
+
+## Supported format
+
+Support the following formats:
+
+- [JSON](#jump_json)
+- [CSV](#jump_csv)
+
+### JSON
+It supports parsing text files in json format. Each line is required to be a standard json string.
+
+### CSV
+Support parsing of text files in csv format. Each line is required to be a standard csv string.
+
+The following parameters are supported to adjust the csv parsing style:
+
+
+| Parameter name | Default value | Description |
+|-----------------------------------|---------------|----------------------------------------------------------------------------|
+| `job.common.csv_delimiter` | `','` | csv delimiter |
+| `job.common.csv_escape` | | escape character |
+| `job.common.csv_quote` | | quote character |
+| `job.common.csv_with_null_string` | | Specify the conversion value of null field. It is not converted by default |
+
+----
+
+
+## Related document
+
+Configuration examples: [hadoop-v1-connector-example](./hadoop-v1-example.md)
diff --git a/website/zh/documents/connectors/README.md b/website/zh/documents/connectors/README.md
index 70bac6e39..3a8e14a35 100644
--- a/website/zh/documents/connectors/README.md
+++ b/website/zh/documents/connectors/README.md
@@ -16,6 +16,7 @@ dir:
- [FTP/SFTP 连接器](ftp/ftp.md)
- [FTP/SFTP-v1 连接器](ftp/v1/ftp-v1.md)
- [Hadoop 连接器](hadoop/hadoop.md)
+- [Hadoop-v1 连接器](hadoop/v1/hadoop-v1.md)
- [HBase 连接器](hbase/hbase.md)
- [Hive 连接器](hive/hive.md)
- [Hudi 连接器](hudi/hudi.md)
diff --git a/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md b/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md
new file mode 100644
index 000000000..a709d0691
--- /dev/null
+++ b/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md
@@ -0,0 +1,104 @@
+# Hadoop连接器使用示例
+
+上级文档: [hadoop连接器](./hadoop-v1.md)
+
+下面展示了如何使用用户参数配置读取如下json格式hdfs文件。
+
+- 示例json数据
+```json
+{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]}
+```
+
+
+- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下
+- 用于读取上述格式hdfs文件的配置
+
+```json
+{
+ "job": {
+ "common": {
+ "job_id": 313,
+ "instance_id": 3123,
+ "job_name": "bitsail_hadoop_to_print_test",
+ "user_name": "root"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"json",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name":"id",
+ "type": "int"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "map_string_string",
+ "type": "map"
+ },
+ {
+ "name": "array_string",
+ "type": "list"
+ }
+ ]
+ }
+ }
+}
+```
+
+下面展示了如何使用用户参数配置读取如下csv格式hdfs文件。
+
+- 示例csv数据
+
+```csv
+1,100001,100.001,text_0001,2020-01-01
+2,100002,100.002,text_0002,2020-01-02
+```
+
+
+- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下
+- 用于读取上述格式hdfs文件的配置
+
+```json
+{
+ "job": {
+ "common": {
+ "job_id": 313,
+ "instance_id": 3123,
+ "job_name": "bitsail_hadoop_to_print_test",
+ "user_name": "root"
+ },
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"csv",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name": "id",
+ "type": "long"
+ },
+ {
+ "name": "int_type",
+ "type": "int"
+ },
+ {
+ "name": "double_type",
+ "type": "double"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "p_date",
+ "type": "date"
+ }
+ ]
+ }
+ }
+}
+```
+
diff --git a/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md b/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md
new file mode 100644
index 000000000..a3efe4587
--- /dev/null
+++ b/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md
@@ -0,0 +1,125 @@
+# Hadoop连接器
+
+上级文档: [connectors](../../README.md)
+
+
+## 主要功能
+
+Hadoop连接器可用于批式场景下的hdfs文件读取。其功能点主要包括:
+
+ - 支持同时读取多个hdfs目录下的文件
+ - 支持读取多种格式的hdfs文件
+
+## 依赖引入
+
+```text
+
+ com.bytedance.bitsail
+ connector-hadoop
+ ${revision}
+
+```
+
+## 支持的数据类型
+ - 支持的基础数据类型如下:
+ - 整数类型:
+ - short
+ - int
+ - long
+ - bitinteger
+ - 浮点类型:
+ - float
+ - double
+ - bigdecimal
+ - 时间类型:
+ - timestamp
+ - date
+ - time
+ - 字符类型:
+ - string
+ - 布尔类型:
+ - boolean
+ - 二进制类型:
+ - binary
+ - 支持的复杂数据类型包括:
+ - map
+ - list
+
+## 主要参数
+
+以下参数使用在`job.reader`配置中,实际使用时请注意路径前缀。示例:
+```json
+{
+ "job": {
+ "reader": {
+ "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource",
+ "content_type":"json",
+ "reader_parallelism_num": 1,
+ "columns": [
+ {
+ "name":"id",
+ "type": "int"
+ },
+ {
+ "name": "string_type",
+ "type": "string"
+ },
+ {
+ "name": "map_string_string",
+ "type": "map"
+ },
+ {
+ "name": "array_string",
+ "type": "list"
+ }
+ ]
+ },
+ }
+}
+```
+
+### 必需参数
+
+| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 |
+|:-------------| :----------- | :---------- | :----------------------------------------------------------- |
+| class | 是 | | Hadoop读连接器类名,v1 connector为为`com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | |
+| path_list | 是 | | 指定读入文件的路径。可指定多个路径,使用`','`分隔 |
+| content_type | 是 | JSON
CSV | 指定读入文件的格式 |
+| columns | 是 | | 数据字段名称及类型 |
+
+### 可选参数
+| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 |
+| :----------------------------------- | :----------- | :--------- | :-------------------------------- |
+| default_hadoop_parallelism_threshold | 否 | | 每个reader读取的文件数目,默认为2 |
+| reader_parallelism_num | 否 | | 读并发数,无默认值 |
+
+
+## 支持的文件格式
+
+支持对以下格式的文件进行解读:
+
+- [JSON](#jump_json)
+- [CSV](#jump_csv)
+
+### JSON
+支持对json格式的文本文件进行解析,要求每行均为标准的json字符串。
+
+### CSV
+支持对csv格式的文本文件进行解析,要求每行均为标准的csv字符串。
+支持以下参数对csv解析方式进行调整:
+
+
+| 参数名称 | 参数默认值 | 参数说明 |
+|-----------------------------------|-------|--------------------|
+| `job.common.csv_delimiter` | `','` | csv分隔符 |
+| `job.common.csv_escape` | | escape字符 |
+| `job.common.csv_quote` | | quote字符 |
+| `job.common.csv_with_null_string` | | 指定null字段的转化值,默认不转化 |
+
+----
+
+
+## 相关文档
+
+配置示例文档 [hadoop v1连接器示例](./hadoop-v1-example.md)
+