Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Map instead of Properties for Hive schma #19711

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -389,7 +388,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
{
HivePartition hivePartition = partition.getHivePartition();
String partitionName = hivePartition.getPartitionId();
Properties schema = partition.getPartition()
Map<String, String> schema = partition.getPartition()
.map(value -> getHiveSchema(value, table))
.orElseGet(() -> getHiveSchema(table));
List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import io.trino.spi.connector.ConnectorSession;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;

public interface HiveFileWriterFactory
{
Expand All @@ -30,7 +30,7 @@ Optional<FileWriter> createFileWriter(
List<String> inputColumnNames,
StorageFormat storageFormat,
HiveCompressionCodec compressionCodec,
Properties schema,
Map<String, String> schema,
ConnectorSession session,
OptionalInt bucketNumber,
AcidTransaction transaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
Expand Down Expand Up @@ -1934,7 +1933,7 @@ private List<String> computeFileNamesForMissingBuckets(

private void createEmptyFiles(ConnectorSession session, Location path, Table table, Optional<Partition> partition, List<String> fileNames)
{
Properties schema;
Map<String, String> schema;
StorageFormat format;
if (partition.isPresent()) {
schema = getHiveSchema(partition.get(), table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import io.trino.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;

public interface HivePageSourceFactory
{
Expand All @@ -31,7 +31,7 @@ Optional<ReaderPageSource> createPageSource(
long start,
long length,
long estimatedFileSize,
Properties schema,
Map<String, String> schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
Optional<AcidInfo> acidInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -173,7 +172,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
long start,
long length,
long estimatedFileSize,
Properties schema,
Map<String, String> schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
TypeManager typeManager,
Optional<BucketConversion> bucketConversion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import io.trino.spi.connector.ConnectorSplit;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -46,7 +46,7 @@ public class HiveSplit
private final long length;
private final long estimatedFileSize;
private final long fileModifiedTime;
private final Properties schema;
private final Map<String, String> schema;
private final List<HivePartitionKey> partitionKeys;
private final List<HostAddress> addresses;
private final String partitionName;
Expand All @@ -67,7 +67,7 @@ public HiveSplit(
@JsonProperty("length") long length,
@JsonProperty("estimatedFileSize") long estimatedFileSize,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("schema") Properties schema,
@JsonProperty("schema") Map<String, String> schema,
@JsonProperty("partitionKeys") List<HivePartitionKey> partitionKeys,
@JsonProperty("readBucketNumber") OptionalInt readBucketNumber,
@JsonProperty("tableBucketNumber") OptionalInt tableBucketNumber,
Expand Down Expand Up @@ -105,7 +105,7 @@ public HiveSplit(
long length,
long estimatedFileSize,
long fileModifiedTime,
Properties schema,
Map<String, String> schema,
List<HivePartitionKey> partitionKeys,
List<HostAddress> addresses,
OptionalInt readBucketNumber,
Expand Down Expand Up @@ -188,7 +188,7 @@ public long getFileModifiedTime()
}

@JsonProperty
public Properties getSchema()
public Map<String, String> getSchema()
{
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
Expand Down Expand Up @@ -299,7 +298,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
}

UpdateMode updateMode;
Properties schema;
Map<String, String> schema = new HashMap<>();
WriteInfo writeInfo;
StorageFormat outputStorageFormat;
HiveCompressionCodec compressionCodec;
Expand All @@ -308,11 +307,10 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
// Write to: a new partition in a new partitioned table,
// or a new unpartitioned table.
updateMode = UpdateMode.NEW;
schema = new Properties();
schema.setProperty(LIST_COLUMNS, dataColumns.stream()
schema.put(LIST_COLUMNS, dataColumns.stream()
.map(DataColumn::getName)
.collect(joining(",")));
schema.setProperty(LIST_COLUMN_TYPES, dataColumns.stream()
schema.put(LIST_COLUMN_TYPES, dataColumns.stream()
.map(DataColumn::getHiveType)
.map(HiveType::getHiveTypeName)
.map(HiveTypeName::toString)
Expand Down Expand Up @@ -371,7 +369,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
}
}

schema = getHiveSchema(table);
schema.putAll(getHiveSchema(table));
}

if (partitionName.isPresent()) {
Expand Down Expand Up @@ -417,7 +415,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt

outputStorageFormat = partition.get().getStorage().getStorageFormat();
compressionCodec = selectCompressionCodec(session, outputStorageFormat);
schema = getHiveSchema(partition.get(), table);
schema.putAll(getHiveSchema(partition.get(), table));

writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get());
break;
Expand All @@ -431,7 +429,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt

outputStorageFormat = fromHiveStorageFormat(partitionStorageFormat);
compressionCodec = selectCompressionCodec(session, partitionStorageFormat);
schema = getHiveSchema(table);
schema.putAll(getHiveSchema(table));

writeInfo = locationService.getPartitionWriteInfo(locationHandle, Optional.empty(), partitionName.get());
break;
Expand All @@ -442,7 +440,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
}
}

additionalTableParameters.forEach(schema::setProperty);
schema.putAll(additionalTableParameters);

validateSchema(partitionName, schema);

Expand Down Expand Up @@ -623,7 +621,7 @@ public SortingFileWriter makeRowIdSortingWriter(FileWriter deleteFileWriter, Loc
OrcFileWriterFactory::createOrcDataSink);
}

private void validateSchema(Optional<String> partitionName, Properties schema)
private void validateSchema(Optional<String> partitionName, Map<String, String> schema)
{
// existing tables may have columns in a different order
List<String> fileColumnNames = getColumnNames(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.trino.spi.HostAddress;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.function.BooleanSupplier;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -37,13 +37,13 @@
@NotThreadSafe
public class InternalHiveSplit
{
private static final int INSTANCE_SIZE = instanceSize(InternalHiveSplit.class) + instanceSize(Properties.class) + instanceSize(OptionalInt.class);
private static final int INSTANCE_SIZE = instanceSize(InternalHiveSplit.class) + instanceSize(OptionalInt.class);

private final String path;
private final long end;
private final long estimatedFileSize;
private final long fileModifiedTime;
private final Properties schema;
private final Map<String, String> schema;
private final List<HivePartitionKey> partitionKeys;
private final List<InternalHiveBlock> blocks;
private final String partitionName;
Expand All @@ -67,7 +67,7 @@ public InternalHiveSplit(
long end,
long estimatedFileSize,
long fileModifiedTime,
Properties schema,
Map<String, String> schema,
List<HivePartitionKey> partitionKeys,
List<InternalHiveBlock> blocks,
OptionalInt readBucketNumber,
Expand Down Expand Up @@ -141,7 +141,7 @@ public long getFileModifiedTime()
return fileModifiedTime;
}

public Properties getSchema()
public Map<String, String> getSchema()
{
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -81,7 +81,7 @@ public class MergeFileWriter
private final RowIdSortingFileWriterMaker sortingFileWriterMaker;
private final OrcFileWriterFactory orcFileWriterFactory;
private final HiveCompressionCodec compressionCodec;
private final Properties hiveAcidSchema;
private final Map<String, String> hiveAcidSchema;
private final String bucketFilename;
private Optional<FileWriter> deleteFileWriter = Optional.empty();
private Optional<FileWriter> insertFileWriter = Optional.empty();
Expand Down Expand Up @@ -247,14 +247,12 @@ private Page buildDeletePage(Block rowIds, long writeId)
private FileWriter getOrCreateInsertFileWriter()
{
if (insertFileWriter.isEmpty()) {
Properties schemaCopy = new Properties();
schemaCopy.putAll(hiveAcidSchema);
insertFileWriter = orcFileWriterFactory.createFileWriter(
deltaDirectory.appendPath(bucketFilename),
ACID_COLUMN_NAMES,
fromHiveStorageFormat(ORC),
compressionCodec,
schemaCopy,
hiveAcidSchema,
session,
bucketNumber,
transaction,
Expand All @@ -267,15 +265,13 @@ private FileWriter getOrCreateInsertFileWriter()
private FileWriter getOrCreateDeleteFileWriter()
{
if (deleteFileWriter.isEmpty()) {
Properties schemaCopy = new Properties();
schemaCopy.putAll(hiveAcidSchema);
Location deletePath = deleteDeltaDirectory.appendPath(bucketFilename);
FileWriter writer = getWriter(orcFileWriterFactory.createFileWriter(
deletePath,
ACID_COLUMN_NAMES,
fromHiveStorageFormat(ORC),
compressionCodec,
schemaCopy,
hiveAcidSchema,
session,
bucketNumber,
transaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
Expand All @@ -36,9 +35,9 @@
import java.io.Closeable;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.function.Supplier;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
Expand Down Expand Up @@ -91,7 +90,7 @@ public Optional<FileWriter> createFileWriter(
List<String> inputColumnNames,
StorageFormat storageFormat,
HiveCompressionCodec compressionCodec,
Properties schema,
Map<String, String> schema,
ConnectorSession session,
OptionalInt bucketNumber,
AcidTransaction transaction,
Expand All @@ -107,7 +106,7 @@ public Optional<FileWriter> createFileWriter(
columnEncodingFactory = new BinaryColumnEncodingFactory(timeZone);
}
else if (COLUMNAR_SERDE_CLASS.equals(storageFormat.getSerde())) {
columnEncodingFactory = new TextColumnEncodingFactory(TextEncodingOptions.fromSchema(Maps.fromProperties(schema)));
columnEncodingFactory = new TextColumnEncodingFactory(TextEncodingOptions.fromSchema(schema));
}
else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
package io.trino.plugin.hive.acid;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.HiveTypeName;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.Type;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.hive.HiveType.HIVE_INT;
Expand Down Expand Up @@ -61,17 +62,17 @@ public final class AcidSchema

private AcidSchema() {}

public static Properties createAcidSchema(HiveType rowType)
public static Map<String, String> createAcidSchema(HiveType rowType)
{
Properties hiveAcidSchema = new Properties();
hiveAcidSchema.setProperty(LIST_COLUMNS, String.join(",", ACID_COLUMN_NAMES));
// We must supply an accurate row type, because Apache ORC code we don't control has a consistency
// check that the layout of this "row" must agree with the layout of an inserted row.
hiveAcidSchema.setProperty(LIST_COLUMN_TYPES, createAcidColumnHiveTypes(rowType).stream()
.map(HiveType::getHiveTypeName)
.map(HiveTypeName::toString)
.collect(joining(":")));
return hiveAcidSchema;
return ImmutableMap.<String, String>builder()
.put(LIST_COLUMNS, String.join(",", ACID_COLUMN_NAMES))
// We must supply an accurate row type, because Apache ORC code we don't control has a consistency
// check that the layout of this "row" must agree with the layout of an inserted row.
.put(LIST_COLUMN_TYPES, createAcidColumnHiveTypes(rowType).stream()
.map(HiveType::getHiveTypeName)
.map(HiveTypeName::toString)
.collect(joining(":")))
.buildOrThrow();
}

public static Type createRowType(List<String> names, List<Type> types)
Expand Down
Loading
Loading