diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml index 3a89a1e11..a7752388a 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml @@ -26,9 +26,6 @@ bitsail-connector-hadoop - - - com.bytedance.bitsail diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml new file mode 100644 index 000000000..37182aafe --- /dev/null +++ b/bitsail-connectors/connector-hadoop/pom.xml @@ -0,0 +1,104 @@ + + + + + + bitsail-connectors + com.bytedance.bitsail + ${revision} + + 4.0.0 + + connector-hadoop + + + 8 + 8 + UTF-8 + + + + + + com.bytedance.bitsail + bitsail-shaded-hadoop + ${revision} + provided + + + + com.bytedance.bitsail + bitsail-shaded-hive + ${revision} + provided + + + netty-common + io.netty + + + netty-buffer + io.netty + + + + + + com.bytedance.bitsail + bitsail-component-format-csv + ${revision} + compile + + + + com.bytedance.bitsail + bitsail-component-format-json + ${revision} + compile + + + + + com.bytedance.bitsail + bitsail-connector-print + ${revision} + test + + + + com.bytedance.bitsail + connector-print + ${revision} + test + + + + com.bytedance.bitsail + bitsail-connector-test + ${revision} + test + + + hppc + com.carrotsearch + + + + + diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java new file mode 100644 index 000000000..60075afd3 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/constant/HadoopConstants.java @@ -0,0 +1,21 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.constant; + +public class HadoopConstants { + public static String HADOOP_CONNECTOR_NAME = "hadoop"; +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java new file mode 100644 index 000000000..ad42bc5df --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/error/HadoopErrorCode.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.error; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum HadoopErrorCode implements ErrorCode { + + REQUIRED_VALUE("Hadoop-01", "You missed parameter which is required, please check your configuration."), + UNSUPPORTED_ENCODING("Hadoop-02", "Unsupported Encoding."), + UNSUPPORTED_COLUMN_TYPE("Hadoop-03", "Unsupported column type."), + HDFS_IO("Hadoop-04", "IO Exception."); + + private final String code; + private final String description; + + HadoopErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, + this.description); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java new file mode 100644 index 000000000..7327dc176 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/HiveInputFormatDeserializationSchema.java @@ -0,0 +1,148 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.format; + +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; + +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +public class HiveInputFormatDeserializationSchema implements DeserializationSchema { + private final BitSailConfiguration deserializationConfiguration; + private final TypeInfo[] typeInfos; + private final String[] fieldNames; + private final StructObjectInspector inspector; + public HiveInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, + TypeInfo[] typeInfos, + String[] fieldNames) { + + this.deserializationConfiguration = deserializationConfiguration; + this.typeInfos = typeInfos; + this.fieldNames = fieldNames; + + List columnInfos = deserializationConfiguration.get(HadoopReaderOptions.COLUMNS); + Properties p = new Properties(); + String columns = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.joining(",")); + String columnsTypes = columnInfos.stream().map(ColumnInfo::getType).collect(Collectors.joining(":")); + p.setProperty("columns", columns); + p.setProperty("columns.types", columnsTypes); + String inputFormatClass = deserializationConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS); + try { + switch (inputFormatClass) { + case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat": { + OrcSerde serde = new OrcSerde(); + serde.initialize(new JobConf(), p); + this.inspector = (StructObjectInspector) serde.getObjectInspector(); + break; + } + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat": { + ParquetHiveSerDe serde = new ParquetHiveSerDe(); + serde.initialize(new JobConf(), p); + this.inspector = (StructObjectInspector) serde.getObjectInspector(); + break; + } + default: + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported input format class: " + inputFormatClass); + } + } catch (SerDeException e) { + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_COLUMN_TYPE, "unsupported column information."); + } + } + + @Override + public Row deserialize(Writable message) { + int arity = fieldNames.length; + List fields = inspector.getAllStructFieldRefs(); + Row row = new Row(arity); + for (int i = 0; i < arity; ++i) { + Object writableData = inspector.getStructFieldData(message, fields.get(i)); + row.setField(i, getWritableValue(writableData)); + } + return row; + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + private Object getWritableValue(Object writable) { + Object ret; + + if (writable == null) { + ret = null; + } else if (writable instanceof IntWritable) { + ret = ((IntWritable) writable).get(); + } else if (writable instanceof Text) { + ret = writable.toString(); + } else if (writable instanceof LongWritable) { + ret = ((LongWritable) writable).get(); + } else if (writable instanceof ByteWritable) { + ret = ((ByteWritable) writable).get(); + } else if (writable instanceof DateWritable) { + ret = ((DateWritable) writable).get(); + } else if (writable instanceof DoubleWritable) { + ret = ((DoubleWritable) writable).get(); + } else if (writable instanceof TimestampWritable) { + ret = ((TimestampWritable) writable).getTimestamp(); + } else if (writable instanceof FloatWritable) { + ret = ((FloatWritable) writable).get(); + } else if (writable instanceof BooleanWritable) { + ret = ((BooleanWritable) writable).get(); + } else if (writable instanceof BytesWritable) { + BytesWritable bytesWritable = (BytesWritable) writable; + byte[] bytes = bytesWritable.getBytes(); + ret = new byte[bytesWritable.getLength()]; + System.arraycopy(bytes, 0, ret, 0, bytesWritable.getLength()); + } else if (writable instanceof HiveDecimalWritable) { + ret = ((HiveDecimalWritable) writable).getHiveDecimal().bigDecimalValue(); + } else if (writable instanceof ShortWritable) { + ret = ((ShortWritable) writable).get(); + } else { + ret = writable.toString(); + } + return ret; + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java new file mode 100644 index 000000000..27a7f7cbf --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/format/TextInputFormatDeserializationSchema.java @@ -0,0 +1,70 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.format; + +import com.bytedance.bitsail.base.enumerate.ContentType; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; +import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; + +import org.apache.hadoop.io.Writable; + +public class TextInputFormatDeserializationSchema implements DeserializationSchema { + private final BitSailConfiguration deserializationConfiguration; + private final TypeInfo[] typeInfos; + private final String[] fieldNames; + private final DeserializationSchema deserializationSchema; + + public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, + TypeInfo[] typeInfos, + String[] fieldNames) { + this.deserializationConfiguration = deserializationConfiguration; + this.typeInfos = typeInfos; + this.fieldNames = fieldNames; + + ContentType contentType = ContentType.valueOf( + deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); + switch (contentType) { + case CSV: + this.deserializationSchema = + new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); + break; + case JSON: + this.deserializationSchema = + new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); + break; + default: + throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); + } + } + + @Override + public Row deserialize(Writable message) { + return deserializationSchema.deserialize((message.toString()).getBytes()); + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java new file mode 100644 index 000000000..74773437c --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/option/HadoopReaderOptions.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions { + @Essential + ConfigOption PATH_LIST = + key(READER_PREFIX + "path_list") + .noDefaultValue(String.class); + + ConfigOption CONTENT_TYPE = + key(READER_PREFIX + "content_type") + .noDefaultValue(String.class); + + ConfigOption READER_PARALLELISM_NUM = + key(READER_PREFIX + "reader_parallelism_num") + .noDefaultValue(Integer.class); + + ConfigOption DEFAULT_HADOOP_PARALLELISM_THRESHOLD = + key(READER_PREFIX + "default_hadoop_parallelism_threshold") + .defaultValue(2); + + ConfigOption HADOOP_INPUT_FORMAT_CLASS = + key(READER_PREFIX + "hadoop_inputformat_class") + .defaultValue("org.apache.hadoop.mapred.TextInputFormat"); +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java new file mode 100644 index 000000000..19d5d6119 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSource.java @@ -0,0 +1,97 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants; +import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.coordinator.HadoopSourceSplitCoordinator; +import com.bytedance.bitsail.connector.hadoop.source.reader.HadoopSourceReader; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class HadoopSource implements Source, ParallelismComputable { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class); + + private BitSailConfiguration readerConfiguration; + private List hadoopPathList; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException { + this.readerConfiguration = readerConfiguration; + hadoopPathList = Arrays.asList(readerConfiguration.getNecessaryOption(HadoopReaderOptions.PATH_LIST, HadoopErrorCode.REQUIRED_VALUE).split(",")); + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new HadoopSourceReader(readerConfiguration, readerContext, hadoopPathList); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { + return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList); + } + + @Override + public String getReaderName() { + return HadoopConstants.HADOOP_CONNECTOR_NAME; + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new BitSailTypeInfoConverter(); + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + int adviceParallelism; + if (selfConf.fieldExists(HadoopReaderOptions.READER_PARALLELISM_NUM)) { + adviceParallelism = selfConf.get(HadoopReaderOptions.READER_PARALLELISM_NUM); + } else { + int parallelismThreshold = selfConf.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD); + adviceParallelism = Math.max(hadoopPathList.size() / parallelismThreshold, 1); + } + LOG.info("Parallelism for this job will set to {}.", adviceParallelism); + return ParallelismAdvice.builder() + .adviceParallelism(adviceParallelism) + .enforceDownStreamChain(true) + .build(); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java new file mode 100644 index 000000000..a5eeeb46b --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/coordinator/HadoopSourceSplitCoordinator.java @@ -0,0 +1,151 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; + +public class HadoopSourceSplitCoordinator implements SourceSplitCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceSplitCoordinator.class); + private final Context coordinatorContext; + private final BitSailConfiguration readerConfiguration; + private final HashSet assignedHadoopSplits; + private HashSet pendingHadoopSplits; + private final JobConf jobConf; + private final InputFormat mapredInputFormat; + + public HadoopSourceSplitCoordinator(BitSailConfiguration readerConfiguration, + Context coordinatorContext, List hadoopPathList) { + this.coordinatorContext = coordinatorContext; + this.readerConfiguration = readerConfiguration; + this.jobConf = new JobConf(); + for (String path : hadoopPathList) { + FileInputFormat.addInputPath(this.jobConf, new Path(path)); + } + String inputClassName = readerConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS); + Class inputClass; + try { + inputClass = Class.forName(inputClassName); + this.mapredInputFormat = (InputFormat) inputClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); + this.assignedHadoopSplits = Sets.newHashSet(); + } + + @Override + public void start() { + this.pendingHadoopSplits = Sets.newHashSet(); + int parallelismThreshold = readerConfiguration.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD); + int readerNum = coordinatorContext.totalParallelism(); + int splitNum = readerNum * parallelismThreshold; + InputSplit[] splits; + try { + splits = mapredInputFormat.getSplits(jobConf, splitNum); + } catch (IOException e) { + throw new RuntimeException(e); + } + Arrays.stream(splits).forEach(split -> pendingHadoopSplits.add(new HadoopSourceSplit(split))); + LOG.info("Found {} readers and {} splits.", readerNum, pendingHadoopSplits.size()); + if (readerNum > pendingHadoopSplits.size()) { + LOG.error("Reader number {} is larger than split number {}.", readerNum, pendingHadoopSplits.size()); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}", subtaskId); + assignSplit(subtaskId); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + pendingHadoopSplits.addAll(splits); + assignSplit(subtaskId); + } + } + + private void assignSplit(int subtaskId) { + ArrayList currentTaskSplits = new ArrayList<>(); + int readerNum = coordinatorContext.totalParallelism(); + if (readerNum == 1) { + // if parallelism == 1, we should assign all the splits to reader + currentTaskSplits.addAll(pendingHadoopSplits); + } else { + // if parallelism > 1, according to hashCode of splitId to determine which reader to allocate the current task + for (HadoopSourceSplit hadoopSourceSplit : pendingHadoopSplits) { + int readerIndex = getReaderIndex(hadoopSourceSplit.uniqSplitId(), readerNum); + if (readerIndex == subtaskId) { + currentTaskSplits.add(hadoopSourceSplit); + } + } + } + // assign splits + coordinatorContext.assignSplit(subtaskId, currentTaskSplits); + // save the state of assigned splits + assignedHadoopSplits.addAll(currentTaskSplits); + // remove the assigned splits from pending splits + currentTaskSplits.forEach(split -> pendingHadoopSplits.remove(split)); + LOG.info("SubTask {} is assigned to [{}]", subtaskId, currentTaskSplits.stream().map(HadoopSourceSplit::uniqSplitId).collect(Collectors.joining(","))); + coordinatorContext.signalNoMoreSplits(subtaskId); + LOG.info("Finish assigning splits reader {}", subtaskId); + } + + private int getReaderIndex(String splitId, int totalReaderNum) { + return (splitId.hashCode() & Integer.MAX_VALUE) % totalReaderNum; + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + + } + + @Override + public EmptyState snapshotState() throws Exception { + return new EmptyState(); + } + + @Override + public void close() { + + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java new file mode 100644 index 000000000..37cdc018a --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/reader/HadoopSourceReader.java @@ -0,0 +1,154 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.hadoop.format.HiveInputFormatDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.format.TextInputFormatDeserializationSchema; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit; + +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +public class HadoopSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceReader.class); + private static final long serialVersionUID = 1L; + private final BitSailConfiguration readerConfiguration; + private final Context readerContext; + private final HashSet assignedHadoopSplits; + private final HashSet finishedHadoopSplits; + private boolean noMoreSplits; + protected JobConf jobConf; + private InputFormat mapredInputFormat; + private K key; + private V value; + private DeserializationSchema deserializationSchema; + private RecordReader recordReader; + + public HadoopSourceReader(BitSailConfiguration readerConfiguration, Context readerContext, List hadoopPathList) { + this.readerConfiguration = readerConfiguration; + this.readerContext = readerContext; + this.assignedHadoopSplits = Sets.newHashSet(); + this.finishedHadoopSplits = Sets.newHashSet(); + this.jobConf = new JobConf(); + for (String path : hadoopPathList) { + FileInputFormat.addInputPath(this.jobConf, new Path(path)); + } + + String inputClassName = readerConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS); + Class inputClass; + try { + inputClass = Class.forName(inputClassName); + this.mapredInputFormat = (InputFormat) inputClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); + this.noMoreSplits = false; + + if (this.mapredInputFormat instanceof TextInputFormat) { + deserializationSchema = new TextInputFormatDeserializationSchema( + readerConfiguration, + readerContext.getTypeInfos(), + readerContext.getFieldNames()); + } else if (this.mapredInputFormat instanceof MapredParquetInputFormat + || this.mapredInputFormat instanceof OrcInputFormat) { + deserializationSchema = new HiveInputFormatDeserializationSchema( + readerConfiguration, + readerContext.getTypeInfos(), + readerContext.getFieldNames()); + } + } + + @Override + public void start() { + + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + for (HadoopSourceSplit sourceSplit : assignedHadoopSplits) { + + sourceSplit.initInputSplit(jobConf); + LOG.info("Start to process split: {}", sourceSplit.uniqSplitId()); + this.recordReader = this.mapredInputFormat.getRecordReader(sourceSplit.getHadoopInputSplit(), jobConf, Reporter.NULL); + if (this.recordReader instanceof Configurable) { + ((Configurable) this.recordReader).setConf(jobConf); + } + key = this.recordReader.createKey(); + value = this.recordReader.createValue(); + while (this.recordReader.next(key, value)) { + Row row = deserializationSchema.deserialize((Writable) value); + pipeline.output(row); + } + finishedHadoopSplits.add(sourceSplit); + } + assignedHadoopSplits.removeAll(finishedHadoopSplits); + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("Subtask {} received no more split signal.", readerContext.getIndexOfSubtask()); + noMoreSplits = true; + } + + @Override + public void addSplits(List splitList) { + assignedHadoopSplits.addAll(splitList); + } + + @Override + public boolean hasMoreElements() { + if (noMoreSplits) { + return CollectionUtils.size(assignedHadoopSplits) != 0; + } + return true; + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + recordReader.close(); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java new file mode 100644 index 000000000..5b25b993e --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/java/com/bytedance/bitsail/connector/hadoop/source/split/HadoopSourceSplit.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class HadoopSourceSplit implements SourceSplit { + private static final long serialVersionUID = 1L; + private final Class splitType; + private transient InputSplit hadoopInputSplit; + private byte[] hadoopInputSplitByteArray; + + public HadoopSourceSplit(InputSplit inputSplit) { + if (inputSplit == null) { + throw new NullPointerException("Hadoop input split must not be null"); + } + + this.splitType = inputSplit.getClass(); + this.hadoopInputSplit = inputSplit; + } + + public InputSplit getHadoopInputSplit() { + return this.hadoopInputSplit; + } + + public void initInputSplit(JobConf jobConf) { + if (this.hadoopInputSplit != null) { + return; + } + + checkNotNull(hadoopInputSplitByteArray); + + try { + this.hadoopInputSplit = (InputSplit) WritableFactories.newInstance(splitType); + + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(jobConf); + } else if (this.hadoopInputSplit instanceof JobConfigurable) { + ((JobConfigurable) this.hadoopInputSplit).configure(jobConf); + } + + if (hadoopInputSplitByteArray != null) { + try (ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(hadoopInputSplitByteArray))) { + this.hadoopInputSplit.readFields(objectInputStream); + } + + this.hadoopInputSplitByteArray = null; + } + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e); + } + } + + private void writeObject(ObjectOutputStream out) throws IOException { + + if (hadoopInputSplit != null) { + try ( + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream) + ) { + this.hadoopInputSplit.write(objectOutputStream); + objectOutputStream.flush(); + this.hadoopInputSplitByteArray = byteArrayOutputStream.toByteArray(); + } + } + out.defaultWriteObject(); + } + + @Override + public String uniqSplitId() { + return hadoopInputSplit.toString(); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json new file mode 100644 index 000000000..5ec9bb9d9 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/main/resources/bitsail-connector-unified-hadoop.json @@ -0,0 +1,9 @@ +{ + "name": "bitsail-connector-unified-hadoop", + "classes": [ + "com.bytedance.bitsail.connector.hadoop.source.HadoopSource" + ], + "libs": [ + "connector-hadoop-${version}.jar" + ] +} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java new file mode 100644 index 000000000..3e0c4c0bf --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java @@ -0,0 +1,116 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class HadoopSourceITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class); + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(new File("/tmp")); + + private static FileSystem FILESYSTEM; + File folder; + + @Before + public void setUp() throws IOException { + FILESYSTEM = LocalFileSystem.getLocal(new Configuration()); + folder = TEMP_FOLDER.newFolder(); + } + + @After + public void close() { + FILESYSTEM = null; + folder = null; + } + + @Test + public void testHadoopToPrintJson() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test.json") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json"); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @Test + public void testHadoopToPrintParquet() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test_parquet") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @Test + public void testHadoopToPrintOrc() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test_orc") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json new file mode 100644 index 000000000..9f013c6f9 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_hive.json @@ -0,0 +1,56 @@ +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns":[ + { + "name":"id_card", + "type":"int" + }, + { + "name":"tran_time", + "type":"string" + }, + { + "name":"name", + "type":"string" + }, + { + "name":"cash", + "type":"int" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.print.sink.PrintSink", + "content_type": "json", + "batch_size": 1, + "columns":[ + { + "name":"id_card", + "type":"int" + }, + { + "name":"tran_time", + "type":"string" + }, + { + "name":"name", + "type":"string" + }, + { + "name":"cash", + "type":"int" + } + ] + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json new file mode 100644 index 000000000..8c0c7aa7a --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/hadoop_to_print_text.json @@ -0,0 +1,56 @@ +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.print.sink.PrintSink", + "content_type": "json", + "batch_size": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json b/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json new file mode 100644 index 000000000..1eb170ed1 --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json @@ -0,0 +1 @@ +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} \ No newline at end of file diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc new file mode 100644 index 000000000..4017688d4 Binary files /dev/null and b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc differ diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet new file mode 100644 index 000000000..7a1c114a6 Binary files /dev/null and b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet differ diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml index b9e4d7963..9a0748cfa 100644 --- a/bitsail-connectors/pom.xml +++ b/bitsail-connectors/pom.xml @@ -46,6 +46,7 @@ connector-druid connector-assert connector-selectdb + connector-hadoop diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml index 285d542a1..4009c554f 100644 --- a/bitsail-dist/pom.xml +++ b/bitsail-dist/pom.xml @@ -272,6 +272,13 @@ ${revision} provided + + + com.bytedance.bitsail + connector-hadoop + ${revision} + provided + diff --git a/website/en/documents/connectors/README.md b/website/en/documents/connectors/README.md index 3e2967a2e..1ab26ce3b 100644 --- a/website/en/documents/connectors/README.md +++ b/website/en/documents/connectors/README.md @@ -16,6 +16,7 @@ dir: - [FTP/SFTP connector](ftp/ftp.md) - [FTP/SFTP-v1 connector](ftp/v1/ftp-v1.md) - [Hadoop connector](hadoop/hadoop.md) +- [Hadoop-v1 connector](hadoop/v1/hadoop-v1.md) - [HBase connector](hbase/hbase.md) - [Hive connector](hive/hive.md) - [Hudi connector](hudi/hudi.md) diff --git a/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md b/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md new file mode 100644 index 000000000..c43805e08 --- /dev/null +++ b/website/en/documents/connectors/hadoop/v1/hadoop-v1-example.md @@ -0,0 +1,104 @@ +# Hadoop connector examples + +Parent document: [hadoop-connector](./hadoop-v1.md) + +The following configuration shows how to organize parameter configuration to read the following json format hdfs file. + +- Example json data +```json +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} +``` + + +- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs +- Configuration file used to read the above hdfs file: + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +The following configuration shows how to organize parameter configuration to read the following csv format hdfs file. + +- Example csv data + +```csv +1,100001,100.001,text_0001,2020-01-01 +2,100002,100.002,text_0002,2020-01-02 +``` + + +- The `local Hadoop environment` is used for testing, and the sample json data is uploaded to the `/test_namespace/source directory` of hdfs +- Configuration file used to read the above hdfs file: + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"csv", + "reader_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "long" + }, + { + "name": "int_type", + "type": "int" + }, + { + "name": "double_type", + "type": "double" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + } + ] + } + } +} +``` + diff --git a/website/en/documents/connectors/hadoop/v1/hadoop-v1.md b/website/en/documents/connectors/hadoop/v1/hadoop-v1.md new file mode 100644 index 000000000..4a1d4a375 --- /dev/null +++ b/website/en/documents/connectors/hadoop/v1/hadoop-v1.md @@ -0,0 +1,126 @@ +# Hadoop connector + +Parent document: [connectors](../../README.md) + + +## Main function + +Hadoop connector can be used to read hdfs files in batch scenarios. Its function points mainly include: + + - Support reading files in multiple hdfs directories at the same time + - Support reading hdfs files of various formats + +## Maven dependency + +```text + + com.bytedance.bitsail + connector-hadoop + ${revision} + +``` + +## Supported data types + - Basic data types supported by Hadoop connectors: + - Integer type: + - short + - int + - long + - biginterger + - Float type: + - float + - double + - bigdecimal + - Time type: + - timestamp + - date + - time + - String type: + - string + - Bool type: + - boolean + - Binary type: + - binary +- Composited data types supported by Hadoop connectors: + - map + - list + +## Parameters + +The following mentioned parameters should be added to `job.reader` block when using, for example: + +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + }, + } +} +``` + +### Necessary parameters + +| Param name | Required | Optional value | Description | +| :----------- | :------- | :------------- | :----------------------------------------------------------- | +| class | Yes | | Class name of hadoop connector, v1 connector is `com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | +| path_list | Yes | | Specifies the path of the read in file. Multiple paths can be specified, separated by `','` | +| content_type | Yes | JSON
CSV | Specify the format of the read in file. | +| columns | Yes | | Describing fields' names and types | + +### Optional parameters +| Param name | Required | Optional value | Description | +| :----------------------------------- | :------- | :------------- | :--------------------------------------------------------- | +| default_hadoop_parallelism_threshold | No | | The number of splits read by each reader, the default is 2 | +| reader_parallelism_num | No | | Reader parallelism | + + +## Supported format + +Support the following formats: + +- [JSON](#jump_json) +- [CSV](#jump_csv) + +### JSON +It supports parsing text files in json format. Each line is required to be a standard json string. + +### CSV +Support parsing of text files in csv format. Each line is required to be a standard csv string. + +The following parameters are supported to adjust the csv parsing style: + + +| Parameter name | Default value | Description | +|-----------------------------------|---------------|----------------------------------------------------------------------------| +| `job.common.csv_delimiter` | `','` | csv delimiter | +| `job.common.csv_escape` | | escape character | +| `job.common.csv_quote` | | quote character | +| `job.common.csv_with_null_string` | | Specify the conversion value of null field. It is not converted by default | + +---- + + +## Related document + +Configuration examples: [hadoop-v1-connector-example](./hadoop-v1-example.md) diff --git a/website/zh/documents/connectors/README.md b/website/zh/documents/connectors/README.md index 70bac6e39..3a8e14a35 100644 --- a/website/zh/documents/connectors/README.md +++ b/website/zh/documents/connectors/README.md @@ -16,6 +16,7 @@ dir: - [FTP/SFTP 连接器](ftp/ftp.md) - [FTP/SFTP-v1 连接器](ftp/v1/ftp-v1.md) - [Hadoop 连接器](hadoop/hadoop.md) +- [Hadoop-v1 连接器](hadoop/v1/hadoop-v1.md) - [HBase 连接器](hbase/hbase.md) - [Hive 连接器](hive/hive.md) - [Hudi 连接器](hudi/hudi.md) diff --git a/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md b/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md new file mode 100644 index 000000000..a709d0691 --- /dev/null +++ b/website/zh/documents/connectors/hadoop/v1/hadoop-v1-example.md @@ -0,0 +1,104 @@ +# Hadoop连接器使用示例 + +上级文档: [hadoop连接器](./hadoop-v1.md) + +下面展示了如何使用用户参数配置读取如下json格式hdfs文件。 + +- 示例json数据 +```json +{"id":0,"string_type":"test_string","map_string_string":{"k1":"v1","k2":"v2","k3":"v3"},"array_string":["a1","a2","a3","a4"]} +``` + + +- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下 +- 用于读取上述格式hdfs文件的配置 + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + } + } +} +``` + +下面展示了如何使用用户参数配置读取如下csv格式hdfs文件。 + +- 示例csv数据 + +```csv +1,100001,100.001,text_0001,2020-01-01 +2,100002,100.002,text_0002,2020-01-02 +``` + + +- 测试时使用本地Hadoop环境,需自行配置,将示例json数据上传至hdfs的/test_namespace/source目录下 +- 用于读取上述格式hdfs文件的配置 + +```json +{ + "job": { + "common": { + "job_id": 313, + "instance_id": 3123, + "job_name": "bitsail_hadoop_to_print_test", + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"csv", + "reader_parallelism_num": 1, + "columns": [ + { + "name": "id", + "type": "long" + }, + { + "name": "int_type", + "type": "int" + }, + { + "name": "double_type", + "type": "double" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "p_date", + "type": "date" + } + ] + } + } +} +``` + diff --git a/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md b/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md new file mode 100644 index 000000000..a3efe4587 --- /dev/null +++ b/website/zh/documents/connectors/hadoop/v1/hadoop-v1.md @@ -0,0 +1,125 @@ +# Hadoop连接器 + +上级文档: [connectors](../../README.md) + + +## 主要功能 + +Hadoop连接器可用于批式场景下的hdfs文件读取。其功能点主要包括: + + - 支持同时读取多个hdfs目录下的文件 + - 支持读取多种格式的hdfs文件 + +## 依赖引入 + +```text + + com.bytedance.bitsail + connector-hadoop + ${revision} + +``` + +## 支持的数据类型 + - 支持的基础数据类型如下: + - 整数类型: + - short + - int + - long + - bitinteger + - 浮点类型: + - float + - double + - bigdecimal + - 时间类型: + - timestamp + - date + - time + - 字符类型: + - string + - 布尔类型: + - boolean + - 二进制类型: + - binary + - 支持的复杂数据类型包括: + - map + - list + +## 主要参数 + +以下参数使用在`job.reader`配置中,实际使用时请注意路径前缀。示例: +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource", + "content_type":"json", + "reader_parallelism_num": 1, + "columns": [ + { + "name":"id", + "type": "int" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "map_string_string", + "type": "map" + }, + { + "name": "array_string", + "type": "list" + } + ] + }, + } +} +``` + +### 必需参数 + +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +|:-------------| :----------- | :---------- | :----------------------------------------------------------- | +| class | 是 | | Hadoop读连接器类名,v1 connector为为`com.bytedance.bitsail.connector.hadoop.source.HadoopSource` | | +| path_list | 是 | | 指定读入文件的路径。可指定多个路径,使用`','`分隔 | +| content_type | 是 | JSON
CSV | 指定读入文件的格式 | +| columns | 是 | | 数据字段名称及类型 | + +### 可选参数 +| 参数名称 | 参数是否必需 | 参数枚举值 | 参数含义 | +| :----------------------------------- | :----------- | :--------- | :-------------------------------- | +| default_hadoop_parallelism_threshold | 否 | | 每个reader读取的文件数目,默认为2 | +| reader_parallelism_num | 否 | | 读并发数,无默认值 | + + +## 支持的文件格式 + +支持对以下格式的文件进行解读: + +- [JSON](#jump_json) +- [CSV](#jump_csv) + +### JSON +支持对json格式的文本文件进行解析,要求每行均为标准的json字符串。 + +### CSV +支持对csv格式的文本文件进行解析,要求每行均为标准的csv字符串。 +支持以下参数对csv解析方式进行调整: + + +| 参数名称 | 参数默认值 | 参数说明 | +|-----------------------------------|-------|--------------------| +| `job.common.csv_delimiter` | `','` | csv分隔符 | +| `job.common.csv_escape` | | escape字符 | +| `job.common.csv_quote` | | quote字符 | +| `job.common.csv_with_null_string` | | 指定null字段的转化值,默认不转化 | + +---- + + +## 相关文档 + +配置示例文档 [hadoop v1连接器示例](./hadoop-v1-example.md) +