Skip to content

Commit

Permalink
[BitSail#106][Connector] Migrate hadoop source connector to v1 interf…
Browse files Browse the repository at this point in the history
…ace.
  • Loading branch information
love-star committed Jan 31, 2023
1 parent 2797644 commit ddb35af
Show file tree
Hide file tree
Showing 19 changed files with 1,369 additions and 0 deletions.
175 changes: 175 additions & 0 deletions bitsail-connectors/connector-hadoop/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2022 Bytedance Ltd. and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-connectors</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-hadoop</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<exclusions>
<exclusion>
<artifactId>guice-servlet</artifactId>
<groupId>com.google.inject.extensions</groupId>
</exclusion>
<exclusion>
<artifactId>guice</artifactId>
<groupId>com.google.inject</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-guice</artifactId>
<groupId>com.sun.jersey.contribs</groupId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<artifactId>metrics-core</artifactId>
<groupId>com.codahale.metrics</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-csv</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-json</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

<!-- dependencies for test -->
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-print</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>connector-print</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-test</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>guice</artifactId>
<groupId>com.google.inject</groupId>
</exclusion>
<exclusion>
<artifactId>guice-servlet</artifactId>
<groupId>com.google.inject.extensions</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.constant;

public class HadoopConstants {
public static String HADOOP_CONNECTOR_NAME = "hadoop";
public static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
public static final String SCHEMA = "hdfs";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.error;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum HadoopErrorCode implements ErrorCode {

REQUIRED_VALUE("Hadoop-01", "You missed parameter which is required, please check your configuration."),
UNSUPPORTED_ENCODING("Hadoop-02", "Unsupported Encoding."),
UNSUPPORTED_COLUMN_TYPE("Hadoop-03", "Unsupported column type."),
HDFS_IO("Hadoop-04", "IO Exception.");

private final String code;
private final String description;

HadoopErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}

@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code,
this.description);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.option;

import com.bytedance.bitsail.common.annotation.Essential;
import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.ReaderOptions;

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX;

public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions {
@Essential
ConfigOption<String> DEFAULT_FS =
key(READER_PREFIX + "defaultFS")
.noDefaultValue(String.class);
@Essential
ConfigOption<String> PATH_LIST =
key(READER_PREFIX + "path_list")
.noDefaultValue(String.class);

@Essential
ConfigOption<String> CONTENT_TYPE =
key(READER_PREFIX + "content_type")
.noDefaultValue(String.class);

ConfigOption<Integer> READER_PARALLELISM_NUM =
key(READER_PREFIX + "reader_parallelism_num")
.noDefaultValue(Integer.class);

ConfigOption<Integer> DEFAULT_HADOOP_PARALLELISM_THRESHOLD =
key(READER_PREFIX + "default_hadoop_parallelism_threshold")
.defaultValue(2);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.source;

import com.bytedance.bitsail.base.connector.reader.v1.Boundedness;
import com.bytedance.bitsail.base.connector.reader.v1.Source;
import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState;
import com.bytedance.bitsail.base.execution.ExecutionEnviron;
import com.bytedance.bitsail.base.extension.ParallelismComputable;
import com.bytedance.bitsail.base.parallelism.ParallelismAdvice;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter;
import com.bytedance.bitsail.common.type.TypeInfoConverter;
import com.bytedance.bitsail.connector.hadoop.constant.HadoopConstants;
import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;
import com.bytedance.bitsail.connector.hadoop.source.coordinator.HadoopSourceSplitCoordinator;
import com.bytedance.bitsail.connector.hadoop.source.reader.HadoopSourceReader;
import com.bytedance.bitsail.connector.hadoop.source.split.HadoopSourceSplit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class HadoopSource implements Source<Row, HadoopSourceSplit, EmptyState>, ParallelismComputable {
private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class);

private BitSailConfiguration readerConfiguration;
private List<String> hadoopPathList;

@Override
public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException {
this.readerConfiguration = readerConfiguration;
hadoopPathList = Arrays.asList(readerConfiguration.getNecessaryOption(HadoopReaderOptions.PATH_LIST, HadoopErrorCode.REQUIRED_VALUE).split(","));
}

@Override
public Boundedness getSourceBoundedness() {
return Boundedness.BOUNDEDNESS;
}

@Override
public SourceReader<Row, HadoopSourceSplit> createReader(SourceReader.Context readerContext) {
return new HadoopSourceReader(readerConfiguration, readerContext);
}

@Override
public SourceSplitCoordinator<HadoopSourceSplit, EmptyState> createSplitCoordinator(SourceSplitCoordinator.Context<HadoopSourceSplit, EmptyState> coordinatorContext) {
return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList);
}

@Override
public String getReaderName() {
return HadoopConstants.HADOOP_CONNECTOR_NAME;
}

@Override
public TypeInfoConverter createTypeInfoConverter() {
return new BitSailTypeInfoConverter();
}

@Override
public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception {
int adviceParallelism;
if (selfConf.fieldExists(HadoopReaderOptions.READER_PARALLELISM_NUM)) {
adviceParallelism = selfConf.get(HadoopReaderOptions.READER_PARALLELISM_NUM);
} else {
int parallelismThreshold = selfConf.get(HadoopReaderOptions.DEFAULT_HADOOP_PARALLELISM_THRESHOLD);
adviceParallelism = Math.max(hadoopPathList.size() / parallelismThreshold, 1);
}
LOG.info("Parallelism for this job will set to {}.", adviceParallelism);
return ParallelismAdvice.builder()
.adviceParallelism(adviceParallelism)
.enforceDownStreamChain(true)
.build();
}
}
Loading

0 comments on commit ddb35af

Please sign in to comment.