Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support hive catalog to read json format tables. #54884

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
} else if (format == THdfsFileFormat::TEXT) {
scanner = new HdfsTextScanner();
} else if ((format == THdfsFileFormat::AVRO || format == THdfsFileFormat::RC_BINARY ||
format == THdfsFileFormat::RC_TEXT || format == THdfsFileFormat::SEQUENCE_FILE) &&
format == THdfsFileFormat::RC_TEXT || format == THdfsFileFormat::SEQUENCE_FILE ||
format == THdfsFileFormat::JSON_TEXT) &&
(dynamic_cast<const HdfsTableDescriptor*>(_hive_table) != nullptr ||
dynamic_cast<const FileTableDescriptor*>(_hive_table) != nullptr)) {
scanner = create_hive_jni_scanner(jni_scanner_create_options).release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class HiveClassNames {
public static final String SEQUENCE_INPUT_FORMAT_CLASS =
"org.apache.hadoop.mapred.SequenceFileInputFormat";

public static final String TEXT_JSON_SERDE_CLASS = "org.apache.hive.hcatalog.data.JsonSerDe";

public static final String SEQUENCE_OUTPUT_FORMAT_CLASS =
"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public static HiveTable toHiveTable(Table table, String catalogName) {
HiveStorageFormat.get(fromHdfsInputFormatClass(table.getSd().getInputFormat()).name())))
.setSerdeProperties(toSerDeProperties(table))
.setStorageFormat(
HiveStorageFormat.get(fromHdfsInputFormatClass(table.getSd().getInputFormat()).name()))
HiveStorageFormat.get(
fromHdfsInputFormatClass(table.getSd().getInputFormat()).name(),
table.getSd().getSerdeInfo().getSerializationLib()))
.setCreateTime(table.getCreateTime())
.setHiveTableType(HiveTable.HiveTableType.fromString(table.getTableType()));

Expand Down Expand Up @@ -359,7 +361,8 @@ public static Partition toPartition(StorageDescriptor sd, Map<String, String> pa
Partition.Builder partitionBuilder = Partition.builder()
.setParams(params)
.setFullPath(sd.getLocation())
.setInputFormat(toRemoteFileInputFormat(sd.getInputFormat()))
.setInputFormat(toRemoteFileInputFormat(sd.getInputFormat(),
sd.getSerdeInfo().getSerializationLib()))
.setTextFileFormatDesc(toTextFileFormatDesc(textFileParameters))
.setSplittable(RemoteFileInputFormat.isSplittable(sd.getInputFormat()));

Expand Down Expand Up @@ -580,6 +583,16 @@ public static RemoteFileInputFormat toRemoteFileInputFormat(String inputFormat)
return RemoteFileInputFormat.fromHdfsInputFormatClass(inputFormat);
}

public static RemoteFileInputFormat toRemoteFileInputFormat(String inputFormat, String serializationLib) {
RemoteFileInputFormat storageFormat = toRemoteFileInputFormat(inputFormat);
if (storageFormat == RemoteFileInputFormat.TEXTFILE) {
if (serializationLib.equals(HiveClassNames.TEXT_JSON_SERDE_CLASS)) {
return RemoteFileInputFormat.JSONTEXT;
}
}
return storageFormat;
}

public static TextFileFormatDesc toTextFileFormatDesc(Map<String, String> parameters) {
final String DEFAULT_FIELD_DELIM = "\001";
final String DEFAULT_COLLECTION_DELIM = "\002";
zhuxiangyi marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static com.starrocks.connector.hive.HiveClassNames.SEQUENCE_INPUT_FORMAT_CLASS;
import static com.starrocks.connector.hive.HiveClassNames.SEQUENCE_OUTPUT_FORMAT_CLASS;
import static com.starrocks.connector.hive.HiveClassNames.TEXT_INPUT_FORMAT_CLASS;
import static com.starrocks.connector.hive.HiveClassNames.TEXT_JSON_SERDE_CLASS;
import static com.starrocks.connector.hive.HiveMetastoreOperations.FILE_FORMAT;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -72,6 +73,11 @@ public enum HiveStorageFormat {
SEQUENCE_INPUT_FORMAT_CLASS,
SEQUENCE_OUTPUT_FORMAT_CLASS
),
JSONTEXT(
TEXT_JSON_SERDE_CLASS,
TEXT_INPUT_FORMAT_CLASS,
HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS
),
UNSUPPORTED("UNSUPPORTED", "UNSUPPORTED", "UNSUPPORTED");

private final String serde;
Expand All @@ -87,6 +93,19 @@ public static HiveStorageFormat get(String format) {
return UNSUPPORTED;
}

public static HiveStorageFormat get(String format, String serializationLib) {
for (HiveStorageFormat storageFormat : HiveStorageFormat.values()) {
if (storageFormat.name().equalsIgnoreCase(format)) {
if (storageFormat == HiveStorageFormat.TEXTFILE &&
serializationLib.equals(HiveClassNames.TEXT_JSON_SERDE_CLASS)) {
return HiveStorageFormat.JSONTEXT;
}
return storageFormat;
}
}
return UNSUPPORTED;
}

public static void check(Map<String, String> properties) {
if (properties.containsKey("format") && !properties.containsKey(FILE_FORMAT)) {
throw new StarRocksConnectorException(
zhuxiangyi marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum RemoteFileInputFormat {
RCBINARY,
RCTEXT,
SEQUENCE,
JSONTEXT,
UNKNOWN;
private static final ImmutableMap<String, RemoteFileInputFormat> CLASS_NAME_TO_INPUT_FORMAT =
new ImmutableMap.Builder<String, RemoteFileInputFormat>()
Expand Down Expand Up @@ -97,6 +98,8 @@ public THdfsFileFormat toThrift() {
return THdfsFileFormat.RC_TEXT;
case SEQUENCE:
return THdfsFileFormat.SEQUENCE_FILE;
case JSONTEXT:
return THdfsFileFormat.JSON_TEXT;
default:
return THdfsFileFormat.UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void testCreateExternalTableWithStorageFormat(@Mocked MetadataMgr metadat
targetFormats.add("RCBINARY");
targetFormats.add("RCTEXT");
targetFormats.add("SEQUENCE");

targetFormats.add("JSONTEXT");
for (String targetFormat : targetFormats) {
HiveTable oTable = createExternalTableByFormat(targetFormat);
String inputFormatClass = HiveStorageFormat.get(targetFormat).getInputFormat();
Expand Down Expand Up @@ -290,6 +290,8 @@ public void testCreateExternalTableWithStorageFormat(@Mocked MetadataMgr metadat
TTableDescriptor tTableDescriptor = hiveTable.toThrift(partitions);

Assert.assertEquals(tTableDescriptor.getHdfsTable().getInput_format(), inputFormatClass);
Assert.assertNotEquals(tTableDescriptor.getHdfsTable().getInput_format(),"UNSUPPORTED");
Assert.assertNotEquals(tTableDescriptor.getHdfsTable().getSerde_lib(),"UNSUPPORTED");
Assert.assertEquals(tTableDescriptor.getHdfsTable().getSerde_lib(), serde);
Assert.assertEquals(tTableDescriptor.getHdfsTable().getHive_column_names(), "col2");
Assert.assertEquals(tTableDescriptor.getHdfsTable().getHive_column_types(), "INT");
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum THdfsFileFormat {
PARQUET = 5,
ORC = 6,
SEQUENCE_FILE = 7,
JSON_TEXT = 8,

UNKNOWN = 100
}
Expand Down
Loading