Skip to content

Commit

Permalink
Use renamed column name in $partition table
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 committed Nov 13, 2024
1 parent 70ed876 commit 9e13de3
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.transforms.Transforms;

import java.util.List;
import java.util.function.Consumer;
Expand All @@ -25,6 +26,7 @@
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.PartitionTransforms.partitionNameTransform;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
Expand Down Expand Up @@ -97,31 +99,33 @@ public static void parsePartitionField(PartitionSpec.Builder builder, String fie
}) ||
tryMatch(field, YEAR_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.year(column, column + "_year" + suffix);
builder.year(column, partitionNameTransform(Transforms.year(), column) + suffix);
}) ||
tryMatch(field, MONTH_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.month(column, column + "_month" + suffix);
builder.month(column, partitionNameTransform(Transforms.month(), column) + suffix);
}) ||
tryMatch(field, DAY_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.day(column, column + "_day" + suffix);
builder.day(column, partitionNameTransform(Transforms.day(), column) + suffix);
}) ||
tryMatch(field, HOUR_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.hour(column, column + "_hour" + suffix);
builder.hour(column, partitionNameTransform(Transforms.hour(), column) + suffix);
}) ||
tryMatch(field, BUCKET_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.bucket(column, parseInt(match.group(2)), column + "_bucket" + suffix);
int bucketCount = parseInt(match.group(2));
builder.bucket(column, bucketCount, partitionNameTransform(Transforms.bucket(bucketCount), column) + suffix);
}) ||
tryMatch(field, TRUNCATE_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.truncate(column, parseInt(match.group(2)), column + "_trunc" + suffix);
int width = parseInt(match.group(2));
builder.truncate(column, width, partitionNameTransform(Transforms.truncate(width), column) + suffix);
}) ||
tryMatch(field, VOID_PATTERN, match -> {
String column = fromIdentifierToColumn(match.group(1));
builder.alwaysNull(column, column + "_null" + suffix);
builder.alwaysNull(column, partitionNameTransform(Transforms.alwaysNull(), column) + suffix);
});
if (!matched) {
throw new IllegalArgumentException("Invalid partition field declaration: " + field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.PartitionTransforms.partitionNameTransform;
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.block.RowValueBuilder.buildRowValue;
Expand Down Expand Up @@ -166,15 +168,34 @@ private Optional<IcebergPartitionColumn> getPartitionColumnType(List<PartitionFi
if (fields.isEmpty()) {
return Optional.empty();
}
List<RowType.Field> partitionFields = fields.stream()
.map(field -> RowType.field(
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());

Map<Integer, String> fieldNames = TypeUtil.indexNameById(icebergTable.schema().asStruct());
ImmutableList.Builder<RowType.Field> fieldsBuilder = ImmutableList.builder();
Map<String, Integer> nameSuffix = new HashMap<>();
schema.columns().stream().map(NestedField::name).forEach(name -> nameSuffix.put(name, 1));

for (PartitionField field : fields) {
String sourceName = fieldNames.getOrDefault(field.sourceId(), field.name());
String targetName;
if (field.transform().isIdentity()) {
checkState(schema.findField(sourceName).fieldId() == field.sourceId(), "Partition field %s is not identity", field);
targetName = partitionNameTransform(field.transform(), sourceName);
}
else {
targetName = partitionNameTransform(field.transform(), sourceName);
int suffix = nameSuffix.compute(targetName, (_, v) -> v == null ? 1 : v + 1);
if (suffix > 1) {
targetName = targetName + "_" + suffix;
}
}
io.trino.spi.type.Type type = toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager);
fieldsBuilder.add(RowType.field(targetName, type));
}

List<Integer> fieldIds = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
return Optional.of(new IcebergPartitionColumn(RowType.from(partitionFields), fieldIds));
return Optional.of(new IcebergPartitionColumn(RowType.from(fieldsBuilder.build()), fieldIds));
}

private Optional<RowType> getMetricsColumnType(List<NestedField> columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.spi.type.VarcharType;
import jakarta.annotation.Nullable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.transforms.Transform;
import org.joda.time.DateTimeField;
import org.joda.time.chrono.ISOChronology;

Expand Down Expand Up @@ -160,6 +161,41 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type sour
throw new UnsupportedOperationException("Unsupported partition transform: " + field);
}

public static String partitionNameTransform(Transform<?, ?> transform, String columnName)
{
String transformString = transform.toString();
switch (transformString) {
case "identity" -> {
return columnName;
}
case "void" -> {
return columnName + "_null";
}
case "year" -> {
return columnName + "_year";
}
case "month" -> {
return columnName + "_month";
}
case "day" -> {
return columnName + "_day";
}
case "hour" -> {
return columnName + "_hour";
}
}

Matcher matcher = BUCKET_PATTERN.matcher(transformString);
if (matcher.matches()) {
return columnName + "_bucket";
}
matcher = TRUNCATE_PATTERN.matcher(transformString);
if (matcher.matches()) {
return columnName + "_trunc";
}
throw new UnsupportedOperationException("Unsupported partition transform: " + transform);
}

private static ColumnTransform identity(Type type)
{
return new ColumnTransform(type, false, true, false, Function.identity(), ValueTransform.identity(type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RowType.field;
import static io.trino.spi.type.RowType.rowType;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
Expand Down Expand Up @@ -381,6 +387,73 @@ public void testFilesTableWithDelete()
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_delete");
}

@Test
public void testPartitionRename()
{
String tableName = "test_partition_rename_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['par1', '\"par2.f1\"']) AS SELECT 1 as v, 11 as par1, CAST(ROW(21, 22) AS ROW(f1 integer, f2 integer)) as par2", 1);

assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME COLUMN par1 to par1_renamed");
assertQuery("SELECT partition.par1_renamed FROM \"" + tableName + "$partitions\"", "VALUES 11");

assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME COLUMN par2 to par2_renamed");
assertQuery("SELECT partition.\"par2_renamed.f1\" FROM \"" + tableName + "$partitions\"", "VALUES 21");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testBucketingRename()
{
String tableName = "test_bucketing_rename_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['bucket(par, 4)']) AS SELECT 1 as v, 11 as par", 1);

assertQuery("SELECT partition.par_bucket FROM \"" + tableName + "$partitions\"", "VALUES 3");
assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME COLUMN par to par_renamed");
assertQuery("SELECT partition.par_renamed_bucket FROM \"" + tableName + "$partitions\"", "VALUES 3");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testPartitionColumns()
{
String tableName = "test_partition_columns_" + randomNameSuffix();
assertUpdate(String.format("""
CREATE TABLE %s WITH (partitioning = ARRAY[
'c1',
'"r1.f1"',
'year(d1)',
'month(d2)',
'day(d3)',
'hour(d4)',
'bucket(b1, 4)',
'truncate(t1, 2)']) AS
SELECT
CAST('c1' AS VARCHAR) as c1
, CAST(ROW(1, 2) AS ROW(f1 integer, f2 integer)) as r1
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d1
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d2
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d3
, CAST('2022-01-01 01:01:01' AS TIMESTAMP) as d4
, 1 as b1
, CAST('12345678' AS VARCHAR) as t1""", tableName), 1);


assertThat(query("SELECT partition FROM \"" + tableName + "$partitions\""))
.result()
.hasTypes(ImmutableList.of(rowType(
field("c1", VARCHAR),
field("r1.f1", INTEGER),
field("d1_year", INTEGER),
field("d2_month", INTEGER),
field("d3_day", DATE),
field("d4_hour", INTEGER),
field("b1_bucket", INTEGER),
field("t1_trunc", VARCHAR))))
.matches("SELECT CAST(ROW('c1', 1, 52, 624, DATE '2022-01-01', 455833, 0, '12') AS ROW(c1 varchar, \"r1.f1\" integer, d1_year integer, d2_month integer, d3_day date, d4_hour integer, b1_bucket integer, t1_trunc varchar))");
}

private Long nanCount(long value)
{
// Parquet does not have nan count metrics
Expand Down

0 comments on commit 9e13de3

Please sign in to comment.