Skip to content

Commit

Permalink
[BitSail#106][Connector] Migrate hadoop source connector to v1 interf…
Browse files Browse the repository at this point in the history
…ace & support more InputFormat.
  • Loading branch information
love-star committed Jan 31, 2023
1 parent ddb35af commit ceef217
Show file tree
Hide file tree
Showing 23 changed files with 647 additions and 190 deletions.
20 changes: 20 additions & 0 deletions bitsail-connectors/connector-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,26 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-shaded-hive</artifactId>
<version>${revision}</version>
<exclusions>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-net</artifactId>
<groupId>commons-net</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@

public class HadoopConstants {
public static String HADOOP_CONNECTOR_NAME = "hadoop";
public static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
public static final String SCHEMA = "hdfs";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.format;

import com.bytedance.bitsail.base.format.DeserializationSchema;
import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.model.ColumnInfo;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;

import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

public class HiveInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private final BitSailConfiguration deserializationConfiguration;
private final TypeInfo<?>[] typeInfos;
private final String[] fieldNames;
private final StructObjectInspector inspector;
public HiveInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
TypeInfo<?>[] typeInfos,
String[] fieldNames) {

this.deserializationConfiguration = deserializationConfiguration;
this.typeInfos = typeInfos;
this.fieldNames = fieldNames;

List<ColumnInfo> columnInfos = deserializationConfiguration.get(HadoopReaderOptions.COLUMNS);
Properties p = new Properties();
String columns = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.joining(","));
String columnsTypes = columnInfos.stream().map(ColumnInfo::getType).collect(Collectors.joining(":"));
p.setProperty("columns", columns);
p.setProperty("columns.types", columnsTypes);
String inputFormatClass = deserializationConfiguration.get(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS);
try {
switch (inputFormatClass) {
case "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat": {
OrcSerde serde = new OrcSerde();
serde.initialize(new JobConf(), p);
this.inspector = (StructObjectInspector) serde.getObjectInspector();
break;
}
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat": {
ParquetHiveSerDe serde = new ParquetHiveSerDe();
serde.initialize(new JobConf(), p);
this.inspector = (StructObjectInspector) serde.getObjectInspector();
break;
}
default:
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported input format class: " + inputFormatClass);
}
} catch (SerDeException e) {
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_COLUMN_TYPE, "unsupported column information.");
}
}

@Override
public Row deserialize(Writable message) {
int arity = fieldNames.length;
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
Row row = new Row(arity);
for (int i = 0; i < arity; ++i) {
Object writableData = inspector.getStructFieldData(message, fields.get(i));
row.setField(i, getWritableValue(writableData));
}
return row;
}

@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}

private Object getWritableValue(Object writable) {
Object ret;

if (writable == null) {
ret = null;
} else if (writable instanceof IntWritable) {
ret = ((IntWritable) writable).get();
} else if (writable instanceof Text) {
ret = writable.toString();
} else if (writable instanceof LongWritable) {
ret = ((LongWritable) writable).get();
} else if (writable instanceof ByteWritable) {
ret = ((ByteWritable) writable).get();
} else if (writable instanceof DateWritable) {
ret = ((DateWritable) writable).get();
} else if (writable instanceof DoubleWritable) {
ret = ((DoubleWritable) writable).get();
} else if (writable instanceof TimestampWritable) {
ret = ((TimestampWritable) writable).getTimestamp();
} else if (writable instanceof FloatWritable) {
ret = ((FloatWritable) writable).get();
} else if (writable instanceof BooleanWritable) {
ret = ((BooleanWritable) writable).get();
} else if (writable instanceof BytesWritable) {
BytesWritable bytesWritable = (BytesWritable) writable;
byte[] bytes = bytesWritable.getBytes();
ret = new byte[bytesWritable.getLength()];
System.arraycopy(bytes, 0, ret, 0, bytesWritable.getLength());
} else if (writable instanceof HiveDecimalWritable) {
ret = ((HiveDecimalWritable) writable).getHiveDecimal().bigDecimalValue();
} else if (writable instanceof ShortWritable) {
ret = ((ShortWritable) writable).get();
} else {
ret = writable.toString();
}
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2022 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.hadoop.format;

import com.bytedance.bitsail.base.enumerate.ContentType;
import com.bytedance.bitsail.base.format.DeserializationSchema;
import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema;
import com.bytedance.bitsail.component.format.json.JsonDeserializationSchema;
import com.bytedance.bitsail.connector.hadoop.error.HadoopErrorCode;
import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions;

import org.apache.hadoop.io.Writable;

public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private final BitSailConfiguration deserializationConfiguration;
private final TypeInfo<?>[] typeInfos;
private final String[] fieldNames;
private final DeserializationSchema<byte[], Row> deserializationSchema;

public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
TypeInfo<?>[] typeInfos,
String[] fieldNames) {
this.deserializationConfiguration = deserializationConfiguration;
this.typeInfos = typeInfos;
this.fieldNames = fieldNames;

ContentType contentType = ContentType.valueOf(
deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());
switch (contentType) {
case CSV:
this.deserializationSchema =
new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
break;
case JSON:
this.deserializationSchema =
new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
break;
default:
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType);
}
}

@Override
public Row deserialize(Writable message) {
return deserializationSchema.deserialize((message.toString()).getBytes());
}

@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@

public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions {
@Essential
ConfigOption<String> DEFAULT_FS =
key(READER_PREFIX + "defaultFS")
.noDefaultValue(String.class);
@Essential
ConfigOption<String> PATH_LIST =
key(READER_PREFIX + "path_list")
.noDefaultValue(String.class);

@Essential
ConfigOption<String> CONTENT_TYPE =
key(READER_PREFIX + "content_type")
.noDefaultValue(String.class);
Expand All @@ -45,4 +40,8 @@ public interface HadoopReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<Integer> DEFAULT_HADOOP_PARALLELISM_THRESHOLD =
key(READER_PREFIX + "default_hadoop_parallelism_threshold")
.defaultValue(2);
}

ConfigOption<String> HADOOP_INPUT_FORMAT_CLASS =
key(READER_PREFIX + "hadoop_inputformat_class")
.defaultValue("org.apache.hadoop.mapred.TextInputFormat");
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.Arrays;
import java.util.List;

public class HadoopSource implements Source<Row, HadoopSourceSplit, EmptyState>, ParallelismComputable {
public class HadoopSource<K, V> implements Source<Row, HadoopSourceSplit, EmptyState>, ParallelismComputable {
private static final Logger LOG = LoggerFactory.getLogger(HadoopSource.class);

private BitSailConfiguration readerConfiguration;
Expand All @@ -61,12 +61,12 @@ public Boundedness getSourceBoundedness() {

@Override
public SourceReader<Row, HadoopSourceSplit> createReader(SourceReader.Context readerContext) {
return new HadoopSourceReader(readerConfiguration, readerContext);
return new HadoopSourceReader<K, V>(readerConfiguration, readerContext, hadoopPathList);
}

@Override
public SourceSplitCoordinator<HadoopSourceSplit, EmptyState> createSplitCoordinator(SourceSplitCoordinator.Context<HadoopSourceSplit, EmptyState> coordinatorContext) {
return new HadoopSourceSplitCoordinator(readerConfiguration, coordinatorContext, hadoopPathList);
return new HadoopSourceSplitCoordinator<K, V>(readerConfiguration, coordinatorContext, hadoopPathList);
}

@Override
Expand Down

This file was deleted.

Loading

0 comments on commit ceef217

Please sign in to comment.