Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BitSail#106][Connector] Migrate hadoop legacy source connector to V1 interface #298

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@

<artifactId>bitsail-connector-hadoop</artifactId>

<properties>
</properties>

<dependencies>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
Expand Down
104 changes: 104 additions & 0 deletions bitsail-connectors/connector-hadoop/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2022 Bytedance Ltd. and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-connectors</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-hadoop</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- Hadoop -->
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-shaded-hadoop</artifactId>
<version>${revision}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-shaded-hive</artifactId>
<version>${revision}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>netty-common</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>netty-buffer</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-csv</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-json</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

<!-- dependencies for test -->
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-print</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>connector-print</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-test</artifactId>
<version>${revision}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>hppc</artifactId>
<groupId>com.carrotsearch</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.constant;

public class HadoopConstants {
public static String HADOOP_CONNECTOR_NAME = "hadoop";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.error;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum HadoopErrorCode implements ErrorCode {

REQUIRED_VALUE("Hadoop-01", "You missed parameter which is required, please check your configuration."),
UNSUPPORTED_ENCODING("Hadoop-02", "Unsupported Encoding."),
UNSUPPORTED_COLUMN_TYPE("Hadoop-03", "Unsupported column type."),
HDFS_IO("Hadoop-04", "IO Exception.");

private final String code;
private final String description;

HadoopErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}

@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code,
this.description);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.format;

import com.bytedance.bitsail.base.format.DeserializationSchema;
import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.model.ColumnInfo;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;

import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

public class HiveInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private final BitSailConfiguration deserializationConfiguration;
private final TypeInfo<?>[] typeInfos;
private final String[] fieldNames;
private final StructObjectInspector inspector;
public HiveInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
TypeInfo<?>[] typeInfos,
String[] fieldNames) {

this.deserializationConfiguration = deserializationConfiguration;
this.typeInfos = typeInfos;
this.fieldNames = fieldNames;

List<ColumnInfo> columnInfos = deserializationConfiguration.get(HadoopReaderOptions.COLUMNS);
Properties p = new Properties();
String columns = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.joining(","));
String columnsTypes = columnInfos.stream().map(ColumnInfo::getType).collect(Collectors.joining(":"));
p.setProperty("columns", columns);
p.setProperty("columns.types", columnsTypes);
String inputFormatClass = deserializationConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS);
try {
switch (inputFormatClass) {
case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat": {
OrcSerde serde = new OrcSerde();
serde.initialize(new JobConf(), p);
this.inspector = (StructObjectInspector) serde.getObjectInspector();
break;
}
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat": {
ParquetHiveSerDe serde = new ParquetHiveSerDe();
serde.initialize(new JobConf(), p);
this.inspector = (StructObjectInspector) serde.getObjectInspector();
break;
}
default:
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported input format class: " + inputFormatClass);
}
} catch (SerDeException e) {
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_COLUMN_TYPE, "unsupported column information.");
}
}

@Override
public Row deserialize(Writable message) {
int arity = fieldNames.length;
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
Row row = new Row(arity);
for (int i = 0; i < arity; ++i) {
Object writableData = inspector.getStructFieldData(message, fields.get(i));
row.setField(i, getWritableValue(writableData));
}
return row;
}

@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}

private Object getWritableValue(Object writable) {
Object ret;

if (writable == null) {
ret = null;
} else if (writable instanceof IntWritable) {
ret = ((IntWritable) writable).get();
} else if (writable instanceof Text) {
ret = writable.toString();
} else if (writable instanceof LongWritable) {
ret = ((LongWritable) writable).get();
} else if (writable instanceof ByteWritable) {
ret = ((ByteWritable) writable).get();
} else if (writable instanceof DateWritable) {
ret = ((DateWritable) writable).get();
} else if (writable instanceof DoubleWritable) {
ret = ((DoubleWritable) writable).get();
} else if (writable instanceof TimestampWritable) {
ret = ((TimestampWritable) writable).getTimestamp();
} else if (writable instanceof FloatWritable) {
ret = ((FloatWritable) writable).get();
} else if (writable instanceof BooleanWritable) {
ret = ((BooleanWritable) writable).get();
} else if (writable instanceof BytesWritable) {
BytesWritable bytesWritable = (BytesWritable) writable;
byte[] bytes = bytesWritable.getBytes();
ret = new byte[bytesWritable.getLength()];
System.arraycopy(bytes, 0, ret, 0, bytesWritable.getLength());
} else if (writable instanceof HiveDecimalWritable) {
ret = ((HiveDecimalWritable) writable).getHiveDecimal().bigDecimalValue();
} else if (writable instanceof ShortWritable) {
ret = ((ShortWritable) writable).get();
} else {
ret = writable.toString();
}
return ret;
}
}
Loading