Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Iceberg materialized view storage tables from metastores #18853

Merged
merged 5 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.Max;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class IcebergConfig
// to avoid deleting those files if Trino is unable to check.
private boolean deleteSchemaLocationsFallback;
private double minimumAssignedSplitWeight = 0.05;
private boolean hideMaterializedViewStorageTable = true;
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
Expand Down Expand Up @@ -342,6 +344,19 @@ public double getMinimumAssignedSplitWeight()
return minimumAssignedSplitWeight;
}

public boolean isHideMaterializedViewStorageTable()
{
return hideMaterializedViewStorageTable;
}

@Config("iceberg.materialized-views.hide-storage-table")
@ConfigDescription("Hide materialized view storage tables in metastore")
public IcebergConfig setHideMaterializedViewStorageTable(boolean hideMaterializedViewStorageTable)
{
this.hideMaterializedViewStorageTable = hideMaterializedViewStorageTable;
return this;
}

@NotNull
public Optional<String> getMaterializedViewsStorageSchema()
{
Expand Down Expand Up @@ -381,4 +396,10 @@ public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}

@AssertFalse(message = "iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false")
public boolean isStorageSchemaSetWhenHidingIsEnabled()
{
return hideMaterializedViewStorageTable && materializedViewsStorageSchema.isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private IcebergFileWriter createAvroWriter(
.collect(toImmutableList());

return new IcebergAvroFileWriter(
new ForwardingOutputFile(fileSystem, outputPath.toString()),
new ForwardingOutputFile(fileSystem, outputPath),
rollbackAction,
icebergSchema,
columnTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergTableName.isDataTable;
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
Expand Down Expand Up @@ -265,6 +268,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN;
Expand Down Expand Up @@ -384,7 +388,21 @@ public ConnectorTableHandle getTableHandle(
throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported");
}

if (!IcebergTableName.isDataTable(tableName.getTableName())) {
if (isMaterializedViewStorage(tableName.getTableName())) {
verify(endVersion.isEmpty(), "Materialized views do not support versioned queries");

SchemaTableName materializedViewName = new SchemaTableName(tableName.getSchemaName(), tableNameFrom(tableName.getTableName()));
if (getMaterializedView(session, materializedViewName).isEmpty()) {
throw new TableNotFoundException(tableName);
}

BaseTable storageTable = catalog.getMaterializedViewStorageTable(session, materializedViewName)
.orElseThrow(() -> new TrinoException(TABLE_NOT_FOUND, "Storage table metadata not found for materialized view " + tableName));

return tableHandleForCurrentSnapshot(tableName, storageTable);
}
findepi marked this conversation as resolved.
Show resolved Hide resolved

if (!isDataTable(tableName.getTableName())) {
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
return null;
}
Expand All @@ -405,23 +423,36 @@ public ConnectorTableHandle getTableHandle(
throw e;
}

Optional<Long> tableSnapshotId;
Schema tableSchema;
Optional<PartitionSpec> partitionSpec;
if (endVersion.isPresent()) {
long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get());
tableSnapshotId = Optional.of(snapshotId);
tableSchema = schemaFor(table, snapshotId);
partitionSpec = Optional.empty();
}
else {
tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId);
tableSchema = table.schema();
partitionSpec = Optional.of(table.spec());
return tableHandleForSnapshot(
tableName,
table,
Optional.of(snapshotId),
schemaFor(table, snapshotId),
Optional.empty());
}
return tableHandleForCurrentSnapshot(tableName, table);
}

private IcebergTableHandle tableHandleForCurrentSnapshot(SchemaTableName tableName, BaseTable table)
{
return tableHandleForSnapshot(
tableName,
table,
Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId),
table.schema(),
Optional.of(table.spec()));
}

private IcebergTableHandle tableHandleForSnapshot(
SchemaTableName tableName,
BaseTable table,
Optional<Long> tableSnapshotId,
Schema tableSchema,
Optional<PartitionSpec> partitionSpec)
{
Map<String, String> tableProperties = table.properties();
String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING);
return new IcebergTableHandle(
trinoCatalogHandle,
tableName.getSchemaName(),
Expand All @@ -435,7 +466,7 @@ public ConnectorTableHandle getTableHandle(
TupleDomain.all(),
OptionalLong.empty(),
ImmutableSet.of(),
Optional.ofNullable(nameMappingJson),
Optional.ofNullable(tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING)),
table.location(),
table.properties(),
false,
Expand Down Expand Up @@ -517,12 +548,12 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
{
if (IcebergTableName.isDataTable(tableName.getTableName())) {
if (isDataTable(tableName.getTableName()) || isMaterializedViewStorage(tableName.getTableName())) {
return Optional.empty();
}

// Only when dealing with an actual system table proceed to retrieve the base table for the system table
String name = IcebergTableName.tableNameFrom(tableName.getTableName());
String name = tableNameFrom(tableName.getTableName());
Table table;
try {
table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name));
Expand All @@ -549,6 +580,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case FILES -> Optional.of(new FilesTable(systemTableName, typeManager, table, getCurrentSnapshotId(table)));
case PROPERTIES -> Optional.of(new PropertiesTable(systemTableName, table));
case REFS -> Optional.of(new RefsTable(systemTableName, table));
case MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected MATERIALIZED_VIEW_STORAGE table type");
};
}

Expand Down Expand Up @@ -2902,9 +2934,12 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s
.orElseThrow(() -> new IllegalStateException("Storage table missing in definition of materialized view " + materializedViewName));

Table icebergTable = catalog.loadTable(session, storageTableName);
String dependsOnTables = icebergTable.currentSnapshot().summary().getOrDefault(DEPENDS_ON_TABLES, "");
String dependsOnTables = Optional.ofNullable(icebergTable.currentSnapshot())
.map(snapshot -> snapshot.summary().getOrDefault(DEPENDS_ON_TABLES, ""))
.orElse("");
if (dependsOnTables.isEmpty()) {
// Information missing. While it's "unknown" whether storage is stale, we return "stale": under no normal circumstances dependsOnTables should be missing.
// Information missing. While it's "unknown" whether storage is stale, we return "stale".
// Normally dependsOnTables may be missing only when there was no refresh yet.
return new MaterializedViewFreshness(STALE, Optional.empty());
}
Instant refreshTime = Optional.ofNullable(icebergTable.currentSnapshot().summary().get(TRINO_QUERY_START_TIME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.regex.Pattern;

import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -80,4 +81,10 @@ public static boolean isDataTable(String name)
String typeString = match.group("type");
return typeString == null;
}

public static boolean isMaterializedViewStorage(String name)
{
Optional<TableType> tableType = tableTypeFrom(name);
return tableType.isPresent() && tableType.get() == MATERIALIZED_VIEW_STORAGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, createTableProperties(tableMetadata));
}

private static Map<String, String> createTableProperties(ConnectorTableMetadata tableMetadata)
public static Map<String, String> createTableProperties(ConnectorTableMetadata tableMetadata)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public enum TableType
PARTITIONS,
FILES,
PROPERTIES,
REFS
REFS,
MATERIALIZED_VIEW_STORAGE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static io.trino.plugin.hive.util.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
Expand Down Expand Up @@ -152,6 +153,11 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
return;
}

if (isMaterializedViewStorage(tableName)) {
commitMaterializedViewRefresh(base, metadata);
return;
}

if (base == null) {
if (PROVIDER_PROPERTY_VALUE.equals(metadata.properties().get(PROVIDER_PROPERTY_KEY))) {
// Assume this is a table executing migrate procedure
Expand All @@ -176,6 +182,8 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)

protected abstract void commitToExistingTable(TableMetadata base, TableMetadata metadata);

protected abstract void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata);
findepi marked this conversation as resolved.
Show resolved Hide resolved

@Override
public FileIO io()
{
Expand Down
Loading
Loading