diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 5dce684f6b50..5b3e0d7f8df5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -20,6 +20,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; +import jakarta.validation.constraints.AssertFalse; import jakarta.validation.constraints.DecimalMax; import jakarta.validation.constraints.DecimalMin; import jakarta.validation.constraints.Max; @@ -71,6 +72,7 @@ public class IcebergConfig // to avoid deleting those files if Trino is unable to check. private boolean deleteSchemaLocationsFallback; private double minimumAssignedSplitWeight = 0.05; + private boolean hideMaterializedViewStorageTable = true; private Optional materializedViewsStorageSchema = Optional.empty(); private boolean sortedWritingEnabled = true; private boolean queryPartitionFilterRequired; @@ -342,6 +344,19 @@ public double getMinimumAssignedSplitWeight() return minimumAssignedSplitWeight; } + public boolean isHideMaterializedViewStorageTable() + { + return hideMaterializedViewStorageTable; + } + + @Config("iceberg.materialized-views.hide-storage-table") + @ConfigDescription("Hide materialized view storage tables in metastore") + public IcebergConfig setHideMaterializedViewStorageTable(boolean hideMaterializedViewStorageTable) + { + this.hideMaterializedViewStorageTable = hideMaterializedViewStorageTable; + return this; + } + @NotNull public Optional getMaterializedViewsStorageSchema() { @@ -381,4 +396,10 @@ public boolean isQueryPartitionFilterRequired() { return queryPartitionFilterRequired; } + + @AssertFalse(message = "iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false") + public boolean isStorageSchemaSetWhenHidingIsEnabled() + { + return hideMaterializedViewStorageTable && materializedViewsStorageSchema.isPresent(); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 9d405b4683aa..b80236f45c9c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -296,7 +296,7 @@ private IcebergFileWriter createAvroWriter( .collect(toImmutableList()); return new IcebergAvroFileWriter( - new ForwardingOutputFile(fileSystem, outputPath.toString()), + new ForwardingOutputFile(fileSystem, outputPath), rollbackAction, icebergSchema, columnTypes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 52a3f67e5030..2a4d774e32ec 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -224,6 +224,9 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled; +import static io.trino.plugin.iceberg.IcebergTableName.isDataTable; +import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; @@ -265,6 +268,7 @@ import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; +import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE; import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN; @@ -384,7 +388,21 @@ public ConnectorTableHandle getTableHandle( throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported"); } - if (!IcebergTableName.isDataTable(tableName.getTableName())) { + if (isMaterializedViewStorage(tableName.getTableName())) { + verify(endVersion.isEmpty(), "Materialized views do not support versioned queries"); + + SchemaTableName materializedViewName = new SchemaTableName(tableName.getSchemaName(), tableNameFrom(tableName.getTableName())); + if (getMaterializedView(session, materializedViewName).isEmpty()) { + throw new TableNotFoundException(tableName); + } + + BaseTable storageTable = catalog.getMaterializedViewStorageTable(session, materializedViewName) + .orElseThrow(() -> new TrinoException(TABLE_NOT_FOUND, "Storage table metadata not found for materialized view " + tableName)); + + return tableHandleForCurrentSnapshot(tableName, storageTable); + } + + if (!isDataTable(tableName.getTableName())) { // Pretend the table does not exist to produce better error message in case of table redirects to Hive return null; } @@ -405,23 +423,36 @@ public ConnectorTableHandle getTableHandle( throw e; } - Optional tableSnapshotId; - Schema tableSchema; - Optional partitionSpec; if (endVersion.isPresent()) { long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get()); - tableSnapshotId = Optional.of(snapshotId); - tableSchema = schemaFor(table, snapshotId); - partitionSpec = Optional.empty(); - } - else { - tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId); - tableSchema = table.schema(); - partitionSpec = Optional.of(table.spec()); + return tableHandleForSnapshot( + tableName, + table, + Optional.of(snapshotId), + schemaFor(table, snapshotId), + Optional.empty()); } + return tableHandleForCurrentSnapshot(tableName, table); + } + private IcebergTableHandle tableHandleForCurrentSnapshot(SchemaTableName tableName, BaseTable table) + { + return tableHandleForSnapshot( + tableName, + table, + Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId), + table.schema(), + Optional.of(table.spec())); + } + + private IcebergTableHandle tableHandleForSnapshot( + SchemaTableName tableName, + BaseTable table, + Optional tableSnapshotId, + Schema tableSchema, + Optional partitionSpec) + { Map tableProperties = table.properties(); - String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING); return new IcebergTableHandle( trinoCatalogHandle, tableName.getSchemaName(), @@ -435,7 +466,7 @@ public ConnectorTableHandle getTableHandle( TupleDomain.all(), OptionalLong.empty(), ImmutableSet.of(), - Optional.ofNullable(nameMappingJson), + Optional.ofNullable(tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING)), table.location(), table.properties(), false, @@ -517,12 +548,12 @@ public Optional getSystemTable(ConnectorSession session, SchemaTabl private Optional getRawSystemTable(ConnectorSession session, SchemaTableName tableName) { - if (IcebergTableName.isDataTable(tableName.getTableName())) { + if (isDataTable(tableName.getTableName()) || isMaterializedViewStorage(tableName.getTableName())) { return Optional.empty(); } // Only when dealing with an actual system table proceed to retrieve the base table for the system table - String name = IcebergTableName.tableNameFrom(tableName.getTableName()); + String name = tableNameFrom(tableName.getTableName()); Table table; try { table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name)); @@ -549,6 +580,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema case FILES -> Optional.of(new FilesTable(systemTableName, typeManager, table, getCurrentSnapshotId(table))); case PROPERTIES -> Optional.of(new PropertiesTable(systemTableName, table)); case REFS -> Optional.of(new RefsTable(systemTableName, table)); + case MATERIALIZED_VIEW_STORAGE -> throw new VerifyException("Unexpected MATERIALIZED_VIEW_STORAGE table type"); }; } @@ -2902,9 +2934,12 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s .orElseThrow(() -> new IllegalStateException("Storage table missing in definition of materialized view " + materializedViewName)); Table icebergTable = catalog.loadTable(session, storageTableName); - String dependsOnTables = icebergTable.currentSnapshot().summary().getOrDefault(DEPENDS_ON_TABLES, ""); + String dependsOnTables = Optional.ofNullable(icebergTable.currentSnapshot()) + .map(snapshot -> snapshot.summary().getOrDefault(DEPENDS_ON_TABLES, "")) + .orElse(""); if (dependsOnTables.isEmpty()) { - // Information missing. While it's "unknown" whether storage is stale, we return "stale": under no normal circumstances dependsOnTables should be missing. + // Information missing. While it's "unknown" whether storage is stale, we return "stale". + // Normally dependsOnTables may be missing only when there was no refresh yet. return new MaterializedViewFreshness(STALE, Optional.empty()); } Instant refreshTime = Optional.ofNullable(icebergTable.currentSnapshot().summary().get(TRINO_QUERY_START_TIME)) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java index 32716889471d..8e5582cc2fb9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableName.java @@ -20,6 +20,7 @@ import java.util.regex.Pattern; import static io.trino.plugin.iceberg.TableType.DATA; +import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -80,4 +81,10 @@ public static boolean isDataTable(String name) String typeString = match.group("type"); return typeString == null; } + + public static boolean isMaterializedViewStorage(String name) + { + Optional tableType = tableTypeFrom(name); + return tableType.isPresent() && tableType.get() == MATERIALIZED_VIEW_STORAGE; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index c13276cb22b1..10815d251d79 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -644,7 +644,7 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, targetPath, createTableProperties(tableMetadata)); } - private static Map createTableProperties(ConnectorTableMetadata tableMetadata) + public static Map createTableProperties(ConnectorTableMetadata tableMetadata) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java index 7141f488d7da..cea961b4d51a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java @@ -22,5 +22,6 @@ public enum TableType PARTITIONS, FILES, PROPERTIES, - REFS + REFS, + MATERIALIZED_VIEW_STORAGE, } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 855f66de3f35..721c4dcf44f9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -48,6 +48,7 @@ import static io.trino.plugin.hive.util.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA; +import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME; import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; @@ -152,6 +153,11 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) return; } + if (isMaterializedViewStorage(tableName)) { + commitMaterializedViewRefresh(base, metadata); + return; + } + if (base == null) { if (PROVIDER_PROPERTY_VALUE.equals(metadata.properties().get(PROVIDER_PROPERTY_KEY))) { // Assume this is a table executing migrate procedure @@ -176,6 +182,8 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) protected abstract void commitToExistingTable(TableMetadata base, TableMetadata metadata); + protected abstract void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata); + @Override public FileIO io() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index a6f3fcbad350..dc518c966dd3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -18,12 +18,14 @@ import dev.failsafe.RetryPolicy; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HiveMetadata; import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; import io.trino.plugin.iceberg.IcebergUtil; import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform; +import io.trino.plugin.iceberg.fileio.ForwardingOutputFile; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -50,6 +52,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.types.Types; @@ -57,7 +60,6 @@ import java.io.IOException; import java.time.Duration; import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,14 +79,21 @@ import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA; import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.getStorageSchema; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; -import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameWithType; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; +import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder; +import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; +import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME; import static io.trino.plugin.iceberg.IcebergUtil.commit; +import static io.trino.plugin.iceberg.IcebergUtil.createTableProperties; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata; import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform; +import static io.trino.plugin.iceberg.SortFieldUtils.parseSortFields; +import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.SmallintType.SMALLINT; @@ -96,8 +105,10 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; import static org.apache.iceberg.TableMetadata.newTableMetadata; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableMetadataParser.getFileExtension; +import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT; import static org.apache.iceberg.Transactions.createOrReplaceTableTransaction; import static org.apache.iceberg.Transactions.createTableTransaction; @@ -111,17 +122,20 @@ public abstract class AbstractTrinoCatalog private final CatalogName catalogName; private final TypeManager typeManager; protected final IcebergTableOperationsProvider tableOperationsProvider; + private final TrinoFileSystemFactory fileSystemFactory; private final boolean useUniqueTableLocation; protected AbstractTrinoCatalog( CatalogName catalogName, TypeManager typeManager, IcebergTableOperationsProvider tableOperationsProvider, + TrinoFileSystemFactory fileSystemFactory, boolean useUniqueTableLocation) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.useUniqueTableLocation = useUniqueTableLocation; } @@ -264,17 +278,52 @@ protected void deleteTableDirectory(TrinoFileSystem fileSystem, SchemaTableName } } + protected Location createMaterializedViewStorage(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition) + { + if (getStorageSchema(definition.getProperties()).isPresent()) { + throw new TrinoException(NOT_SUPPORTED, "Materialized view property '%s' is not supported when hiding materialized view storage tables is enabled".formatted(STORAGE_SCHEMA)); + } + SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), tableNameWithType(viewName.getTableName(), MATERIALIZED_VIEW_STORAGE)); + String tableLocation = getTableLocation(definition.getProperties()) + .orElseGet(() -> defaultTableLocation(session, viewName)); + List columns = columnsForMaterializedView(definition); + + Schema schema = schemaFromMetadata(columns); + PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(definition.getProperties())); + SortOrder sortOrder = parseSortFields(schema, getSortOrder(definition.getProperties())); + Map properties = createTableProperties(new ConnectorTableMetadata(storageTableName, columns, definition.getProperties(), Optional.empty())); + + TableMetadata metadata = newTableMetadata(schema, partitionSpec, sortOrder, tableLocation, properties); + + String fileName = format("%05d-%s%s", 0, randomUUID(), getFileExtension(METADATA_COMPRESSION_DEFAULT)); + Location metadataFileLocation = Location.of(tableLocation).appendPath(METADATA_FOLDER_NAME).appendPath(fileName); + + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TableMetadataParser.write(metadata, new ForwardingOutputFile(fileSystem, metadataFileLocation)); + + return metadataFileLocation; + } + protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition) { // Generate a storage table name and create a storage table. The properties in the definition are table properties for the // storage table as indicated in the materialized view definition. String storageTableName = "st_" + randomUUID().toString().replace("-", ""); - Map storageTableProperties = new HashMap<>(definition.getProperties()); - storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT); String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName()); SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName); + List columns = columnsForMaterializedView(definition); + + ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, definition.getProperties(), Optional.empty()); + Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session, false); + AppendFiles appendFiles = transaction.newAppend(); + commit(appendFiles, session); + transaction.commitTransaction(); + return storageTable; + } + private List columnsForMaterializedView(ConnectorMaterializedViewDefinition definition) + { Schema schemaWithTimestampTzPreserved = schemaFromMetadata(mappedCopy( definition.getColumns(), column -> { @@ -301,7 +350,7 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se }) .collect(toImmutableSet()); - List columns = mappedCopy( + return mappedCopy( definition.getColumns(), column -> { Type type = typeManager.getType(column.getType()); @@ -314,13 +363,6 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se } return new ColumnMetadata(column.getName(), type); }); - - ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); - Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session, false); - AppendFiles appendFiles = transaction.newAppend(); - commit(appendFiles, session); - transaction.commitTransaction(); - return storageTable; } /** @@ -418,6 +460,17 @@ protected Map createMaterializedViewProperties(ConnectorSession .buildOrThrow(); } + protected Map createMaterializedViewProperties(ConnectorSession session, Location storageMetadataLocation) + { + return ImmutableMap.builder() + .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) + .put(METADATA_LOCATION_PROP, storageMetadataLocation.toString()) + .put(PRESTO_VIEW_FLAG, "true") + .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) + .put(TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT) + .buildOrThrow(); + } + protected static class MaterializedViewMayBeBeingRemovedException extends RuntimeException { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index 03ba84d0f533..05ca9a5a6739 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -24,6 +24,7 @@ import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.TrinoPrincipal; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -168,6 +169,8 @@ void createMaterializedView( Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName); + Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName); + void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target); void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java index e25fac198f77..05e4b1357b26 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java @@ -27,11 +27,14 @@ import org.apache.iceberg.io.FileIO; import java.util.Optional; +import java.util.function.BiFunction; import static com.google.common.base.Preconditions.checkState; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; @NotThreadSafe public class FileMetastoreTableOperations @@ -53,9 +56,24 @@ public FileMetastoreTableOperations( protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) { Table currentTable = getTable(); + commitTableUpdate(currentTable, metadata, (table, newMetadataLocation) -> Table.builder(table) + .apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation))) + .build()); + } + + @Override + protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata) + { + Table materializedView = getTable(database, tableNameFrom(tableName)); + commitTableUpdate(materializedView, metadata, (table, newMetadataLocation) -> Table.builder(table) + .apply(builder -> builder.setParameter(METADATA_LOCATION_PROP, newMetadataLocation).setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation)) + .build()); + } + private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction tableUpdateFunction) + { checkState(currentMetadataLocation != null, "No current metadata location for existing table"); - String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION_PROP); + String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); if (!currentMetadataLocation.equals(metadataLocation)) { throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", currentMetadataLocation, metadataLocation, getSchemaTableName()); @@ -63,15 +81,13 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); - Table table = Table.builder(currentTable) - .apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation))) - .build(); + Table updatedTable = tableUpdateFunction.apply(table, newMetadataLocation); // todo privileges should not be replaced for an alter PrincipalPrivileges privileges = table.getOwner().map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); try { - metastore.replaceTable(database, tableName, table, privileges); + metastore.replaceTable(database, table.getTableName(), updatedTable, privileges); } catch (RuntimeException e) { if (e instanceof TrinoException trinoException && diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java index 50e19f2c43fa..70677a25b510 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java @@ -38,8 +38,10 @@ import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.io.FileIO; +import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; import static com.google.common.base.Verify.verify; import static io.trino.plugin.hive.ViewReaderUtil.isTrinoMaterializedView; @@ -48,6 +50,9 @@ import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.getTableType; import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; +import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -90,17 +95,25 @@ protected GlueIcebergTableOperations( @Override protected String getRefreshedLocation(boolean invalidateCaches) { - Table table = getTable(invalidateCaches); + boolean isMaterializedViewStorageTable = isMaterializedViewStorage(tableName); + + Table table; + if (isMaterializedViewStorageTable) { + table = getTable(database, tableNameFrom(tableName), invalidateCaches); + } + else { + table = getTable(database, tableName, invalidateCaches); + } glueVersionId = table.getVersionId(); String tableType = getTableType(table); Map parameters = getTableParameters(table); - if (isTrinoView(tableType, parameters) || isTrinoMaterializedView(tableType, parameters)) { + if (!isMaterializedViewStorageTable && (isTrinoView(tableType, parameters) || isTrinoMaterializedView(tableType, parameters))) { // this is a Hive view or Trino/Presto view, or Trino materialized view, hence not a table // TODO table operations should not be constructed for views (remove exception-driven code path) throw new TableNotFoundException(getSchemaTableName()); } - if (!isIcebergTable(parameters)) { + if (!isMaterializedViewStorageTable && !isIcebergTable(parameters)) { throw new UnknownTableTypeException(getSchemaTableName()); } @@ -138,15 +151,43 @@ protected void commitNewTable(TableMetadata metadata) @Override protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) { - String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); - TableInput tableInput = getTableInput( - typeManager, - tableName, - owner, + commitTableUpdate( + getTable(database, tableName, false), + metadata, + (table, newMetadataLocation) -> + getTableInput( + typeManager, + tableName, + owner, + metadata, + newMetadataLocation, + ImmutableMap.of(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation), + cacheTableMetadata)); + } + + @Override + protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata) + { + commitTableUpdate( + getTable(database, tableNameFrom(tableName), false), metadata, - newMetadataLocation, - ImmutableMap.of(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation), - cacheTableMetadata); + (table, newMetadataLocation) -> { + Map parameters = new HashMap<>(getTableParameters(table)); + parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation); + + return getMaterializedViewTableInput( + table.getName(), + table.getViewOriginalText(), + table.getOwner(), + parameters); + }); + } + + private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction tableUpdateFunction) + { + String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); + TableInput tableInput = tableUpdateFunction.apply(table, newMetadataLocation); UpdateTableRequest updateTableRequest = new UpdateTableRequest() .withDatabaseName(database) @@ -171,7 +212,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) shouldRefresh = true; } - private Table getTable(boolean invalidateCaches) + private Table getTable(String database, String tableName, boolean invalidateCaches) { return getGlueTable.get(new SchemaTableName(database, tableName), invalidateCaches); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 2c069ae085a9..e9d07c5ba332 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -41,6 +41,8 @@ import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.cache.EvictableCacheBuilder; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.SchemaAlreadyExistsException; @@ -52,7 +54,9 @@ import io.trino.plugin.iceberg.IcebergMetadata; import io.trino.plugin.iceberg.UnknownTableTypeException; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -80,6 +84,7 @@ import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.FileIO; import java.time.Duration; @@ -100,6 +105,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.cache.CacheUtils.uncheckedCacheGet; @@ -128,6 +134,7 @@ import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameWithType; import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_NOT_NULL_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_TYPE_ID_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.TRINO_TABLE_METADATA_INFO_VALID_FOR; @@ -136,6 +143,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput; @@ -166,6 +174,7 @@ public class TrinoGlueCatalog private final Optional defaultSchemaLocation; private final AWSGlueAsync glueClient; private final GlueMetastoreStats stats; + private final boolean hideMaterializedViewStorageTable; private final Cache glueTableCache = EvictableCacheBuilder.newBuilder() // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables. @@ -173,7 +182,7 @@ public class TrinoGlueCatalog .build(); private final Map tableMetadataCache = new ConcurrentHashMap<>(); private final Map viewCache = new ConcurrentHashMap<>(); - private final Map materializedViewCache = new ConcurrentHashMap<>(); + private final Map materializedViewCache = new ConcurrentHashMap<>(); public TrinoGlueCatalog( CatalogName catalogName, @@ -185,9 +194,10 @@ public TrinoGlueCatalog( AWSGlueAsync glueClient, GlueMetastoreStats stats, Optional defaultSchemaLocation, - boolean useUniqueTableLocation) + boolean useUniqueTableLocation, + boolean hideMaterializedViewStorageTable) { - super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation); + super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.cacheTableMetadata = cacheTableMetadata; @@ -195,6 +205,7 @@ public TrinoGlueCatalog( this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.stats = requireNonNull(stats, "stats is null"); this.defaultSchemaLocation = requireNonNull(defaultSchemaLocation, "defaultSchemaLocation is null"); + this.hideMaterializedViewStorageTable = hideMaterializedViewStorageTable; } @Override @@ -821,15 +832,7 @@ private Optional getTableAndCacheMetada String metadataLocation = parameters.get(METADATA_LOCATION_PROP); try { // Cache the TableMetadata while we have the Table retrieved anyway - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - schemaTableName.getSchemaName(), - schemaTableName.getTableName(), - Optional.empty(), - Optional.empty()); - FileIO io = operations.io(); - tableMetadataCache.put(schemaTableName, TableMetadataParser.read(io, io.newInputFile(metadataLocation))); + tableMetadataCache.put(schemaTableName, TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation); @@ -841,8 +844,10 @@ else if (isTrinoMaterializedView(tableType, parameters)) { } try { - createMaterializedViewDefinition(session, schemaTableName, table) - .ifPresent(materializedView -> materializedViewCache.put(schemaTableName, materializedView)); + ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); + materializedViewCache.put(schemaTableName, new MaterializedViewData( + materializedView, + Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP)))); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName); @@ -1144,6 +1149,31 @@ public void createMaterializedView( } } + if (hideMaterializedViewStorageTable) { + Location storageMetadataLocation = createMaterializedViewStorage(session, viewName, definition); + TableInput materializedViewTableInput = getMaterializedViewTableInput( + viewName.getTableName(), + encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)), + session.getUser(), + createMaterializedViewProperties(session, storageMetadataLocation)); + if (existing.isPresent()) { + updateTable(viewName.getSchemaName(), materializedViewTableInput); + } + else { + createTable(viewName.getSchemaName(), materializedViewTableInput); + } + } + else { + createMaterializedViewWithStorageTable(session, viewName, definition, existing); + } + } + + private void createMaterializedViewWithStorageTable( + ConnectorSession session, + SchemaTableName viewName, + ConnectorMaterializedViewDefinition definition, + Optional existing) + { // Create the storage table SchemaTableName storageTable = createMaterializedViewStorageTable(session, viewName, definition); // Create a view indicating the storage table @@ -1195,17 +1225,17 @@ public void updateMaterializedViewColumnComment(ConnectorSession session, Schema definition.getPath(), definition.getProperties()); - updateMaterializedView(session, viewName, newDefinition); + updateMaterializedView(viewName, newDefinition); } - private void updateMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition newDefinition) + private void updateMaterializedView(SchemaTableName viewName, ConnectorMaterializedViewDefinition newDefinition) { + com.amazonaws.services.glue.model.Table table = getTable(viewName, false); TableInput materializedViewTableInput = getMaterializedViewTableInput( viewName.getTableName(), encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(newDefinition)), - session.getUser(), - createMaterializedViewProperties(session, newDefinition.getStorageTable().orElseThrow().getSchemaTableName())); - + table.getOwner(), + getTableParameters(table)); try { updateTable(viewName.getSchemaName(), materializedViewTableInput); } @@ -1247,9 +1277,9 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g @Override protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName viewName) { - ConnectorMaterializedViewDefinition materializedViewDefinition = materializedViewCache.get(viewName); - if (materializedViewDefinition != null) { - return Optional.of(materializedViewDefinition); + MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + if (materializedViewData != null) { + return Optional.of(materializedViewData.connectorMaterializedViewDefinition); } if (tableMetadataCache.containsKey(viewName) || viewCache.containsKey(viewName)) { @@ -1267,43 +1297,132 @@ protected Optional doGetMaterializedView(Co return Optional.empty(); } - return createMaterializedViewDefinition(session, viewName, table); + return Optional.of(createMaterializedViewDefinition(session, viewName, table)); } - private Optional createMaterializedViewDefinition( + private ConnectorMaterializedViewDefinition createMaterializedViewDefinition( ConnectorSession session, SchemaTableName viewName, com.amazonaws.services.glue.model.Table table) { Map materializedViewParameters = getTableParameters(table); String storageTable = materializedViewParameters.get(STORAGE_TABLE); - checkState(storageTable != null, "Storage table missing in definition of materialized view " + viewName); - String storageSchema = Optional.ofNullable(materializedViewParameters.get(STORAGE_SCHEMA)) - .orElse(viewName.getSchemaName()); - SchemaTableName storageTableName = new SchemaTableName(storageSchema, storageTable); + String storageMetadataLocation = materializedViewParameters.get(METADATA_LOCATION_PROP); + if ((storageTable == null) == (storageMetadataLocation == null)) { + throw new TrinoException(ICEBERG_BAD_DATA, "Materialized view should have exactly one of the %s properties set: %s".formatted( + ImmutableList.of(STORAGE_TABLE, METADATA_LOCATION_PROP), + materializedViewParameters)); + } + if (storageTable != null) { + String storageSchema = Optional.ofNullable(materializedViewParameters.get(STORAGE_SCHEMA)) + .orElse(viewName.getSchemaName()); + SchemaTableName storageTableName = new SchemaTableName(storageSchema, storageTable); + + Table icebergTable; + try { + icebergTable = loadTable(session, storageTableName); + } + catch (RuntimeException e) { + // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. + // - io.trino.spi.connector.TableNotFoundException + // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file + // - other failures when reading storage table's metadata files + // Retry, as we're catching broadly. + throw new MaterializedViewMayBeBeingRemovedException(e); + } + + String viewOriginalText = table.getViewOriginalText(); + if (viewOriginalText == null) { + throw new TrinoException(ICEBERG_BAD_DATA, "Materialized view did not have original text " + viewName); + } + return getMaterializedViewDefinition( + icebergTable, + Optional.ofNullable(table.getOwner()), + viewOriginalText, + storageTableName); + } + + SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), tableNameWithType(viewName.getTableName(), MATERIALIZED_VIEW_STORAGE)); Table icebergTable; try { - icebergTable = loadTable(session, storageTableName); + TableMetadata metadata = getMaterializedViewTableMetadata(session, storageTableName, storageMetadataLocation); + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + storageTableName.getSchemaName(), + storageTableName.getTableName(), + Optional.empty(), + Optional.empty()); + operations.initializeFromMetadata(metadata); + icebergTable = new BaseTable(operations, quotedTableName(storageTableName), TRINO_METRICS_REPORTER); } catch (RuntimeException e) { // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. - // - io.trino.spi.connector.TableNotFoundException // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file // - other failures when reading storage table's metadata files // Retry, as we're catching broadly. throw new MaterializedViewMayBeBeingRemovedException(e); } - String viewOriginalText = table.getViewOriginalText(); - if (viewOriginalText == null) { - throw new TrinoException(ICEBERG_BAD_DATA, "Materialized view did not have original text " + viewName); - } - return Optional.of(getMaterializedViewDefinition( + return getMaterializedViewDefinition( icebergTable, Optional.ofNullable(table.getOwner()), - viewOriginalText, - storageTableName)); + table.getViewOriginalText(), + storageTableName); + } + + @Override + public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) + { + String storageMetadataLocation; + MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + if (materializedViewData == null) { + Optional maybeTable = getTableAndCacheMetadata(session, viewName); + if (maybeTable.isEmpty()) { + return Optional.empty(); + } + com.amazonaws.services.glue.model.Table materializedView = maybeTable.get(); + verify(isTrinoMaterializedView(getTableType(materializedView), getTableParameters(materializedView)), + "getMaterializedViewStorageTable received a table, not a materialized view"); + + // TODO getTableAndCacheMetadata saved the value in materializedViewCache, so we could just use that, except when conversion fails + storageMetadataLocation = getTableParameters(materializedView).get(METADATA_LOCATION_PROP); + checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + materializedView.getName()); + } + else { + storageMetadataLocation = materializedViewData.storageMetadataLocation + .orElseThrow(() -> new IllegalStateException("Storage location not defined for materialized view " + viewName)); + } + + SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), tableNameWithType(viewName.getTableName(), MATERIALIZED_VIEW_STORAGE)); + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + storageTableName.getSchemaName(), + storageTableName.getTableName(), + Optional.empty(), + Optional.empty()); + + try { + TableMetadata metadata = getMaterializedViewTableMetadata(session, storageTableName, storageMetadataLocation); + operations.initializeFromMetadata(metadata); + return Optional.of(new BaseTable(operations, quotedTableName(storageTableName), TRINO_METRICS_REPORTER)); + } + catch (NotFoundException e) { + // Removed during reading + return Optional.empty(); + } + } + + private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, SchemaTableName storageTableName, String storageMetadataLocation) + { + requireNonNull(storageTableName, "storageTableName is null"); + requireNonNull(storageMetadataLocation, "storageMetadataLocation is null"); + return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation); + }); } @Override @@ -1430,4 +1549,15 @@ private void deleteTable(String schema, String table) .withDatabaseName(schema) .withName(table))); } + + private record MaterializedViewData( + ConnectorMaterializedViewDefinition connectorMaterializedViewDefinition, + Optional storageMetadataLocation) + { + private MaterializedViewData + { + requireNonNull(connectorMaterializedViewDefinition, "connectorMaterializedViewDefinition is null"); + requireNonNull(storageMetadataLocation, "storageMetadataLocation is null"); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java index 054c1bbc1c79..2a76128e0290 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java @@ -45,6 +45,7 @@ public class TrinoGlueCatalogFactory private final Optional defaultSchemaLocation; private final AWSGlueAsync glueClient; private final boolean isUniqueTableLocation; + private final boolean hideMaterializedViewStorageTable; private final GlueMetastoreStats stats; @Inject @@ -69,6 +70,7 @@ public TrinoGlueCatalogFactory( this.defaultSchemaLocation = glueConfig.getDefaultWarehouseDir(); this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation(); + this.hideMaterializedViewStorageTable = icebergConfig.isHideMaterializedViewStorageTable(); this.stats = requireNonNull(stats, "stats is null"); } @@ -92,6 +94,7 @@ public TrinoCatalog create(ConnectorIdentity identity) glueClient, stats, defaultSchemaLocation, - isUniqueTableLocation); + isUniqueTableLocation, + hideMaterializedViewStorageTable); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java index 39b1f8bf23dc..9769819b3d3b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java @@ -38,6 +38,8 @@ import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -71,14 +73,23 @@ protected final String getRefreshedLocation(boolean invalidateCaches) if (invalidateCaches) { metastore.invalidateTable(database, tableName); } - Table table = getTable(); - if (isTrinoView(table) || isTrinoMaterializedView(table)) { + boolean isMaterializedViewStorageTable = isMaterializedViewStorage(tableName); + + Table table; + if (isMaterializedViewStorageTable) { + table = getTable(database, tableNameFrom(tableName)); + } + else { + table = getTable(); + } + + if (!isMaterializedViewStorageTable && (isTrinoView(table) || isTrinoMaterializedView(table))) { // this is a Hive view or Trino/Presto view, or Trino materialized view, hence not a table // TODO table operations should not be constructed for views (remove exception-driven code path) throw new TableNotFoundException(getSchemaTableName()); } - if (!isIcebergTable(table)) { + if (!isMaterializedViewStorageTable && !isIcebergTable(table)) { throw new UnknownTableTypeException(getSchemaTableName()); } @@ -132,6 +143,11 @@ protected Table.Builder updateMetastoreTable(Table.Builder builder, TableMetadat } protected Table getTable() + { + return getTable(database, tableName); + } + + protected Table getTable(String database, String tableName) { return metastore.getTable(database, tableName) .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java index 5a5d4dead2b1..9a2a60e8958a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java @@ -29,13 +29,16 @@ import org.apache.iceberg.io.FileIO; import java.util.Optional; +import java.util.function.BiFunction; import static com.google.common.base.Preconditions.checkState; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable; +import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; @NotThreadSafe public class HiveMetastoreTableOperations @@ -61,15 +64,33 @@ public HiveMetastoreTableOperations( @Override protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) { - String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); + Table currentTable = getTable(); + commitTableUpdate(currentTable, metadata, (table, newMetadataLocation) -> Table.builder(table) + .apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation))) + .build()); + } + + @Override + protected final void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata) + { + Table materializedView = getTable(database, tableNameFrom(tableName)); + commitTableUpdate(materializedView, metadata, (table, newMetadataLocation) -> Table.builder(table) + .apply(builder -> builder + .setParameter(METADATA_LOCATION_PROP, newMetadataLocation) + .setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation)) + .build()); + } + private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction tableUpdateFunction) + { + String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); long lockId = thriftMetastore.acquireTableExclusiveLock( new AcidTransactionOwner(session.getUser()), session.getQueryId(), - database, - tableName); + table.getDatabaseName(), + table.getTableName()); try { - Table currentTable = fromMetastoreApiTable(thriftMetastore.getTable(database, tableName) + Table currentTable = fromMetastoreApiTable(thriftMetastore.getTable(database, table.getTableName()) .orElseThrow(() -> new TableNotFoundException(getSchemaTableName()))); checkState(currentMetadataLocation != null, "No current metadata location for existing table"); @@ -79,14 +100,12 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) currentMetadataLocation, metadataLocation, getSchemaTableName()); } - Table table = Table.builder(currentTable) - .apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation))) - .build(); + Table updatedTable = tableUpdateFunction.apply(table, newMetadataLocation); // todo privileges should not be replaced for an alter PrincipalPrivileges privileges = table.getOwner().map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); try { - metastore.replaceTable(database, tableName, table, privileges); + metastore.replaceTable(table.getDatabaseName(), table.getTableName(), updatedTable, privileges); } catch (RuntimeException e) { // Cannot determine whether the `replaceTable` operation was successful, @@ -103,7 +122,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) // So, that underlying iceberg API will not do the metadata cleanup, otherwise table will be in unusable state. // If configured and supported, the unreleased lock will be automatically released by the metastore after not hearing a heartbeat for a while, // or otherwise it might need to be manually deleted from the metastore backend storage. - log.error(e, "Failed to release lock %s when committing to table %s", lockId, tableName); + log.error(e, "Failed to release lock %s when committing to table %s", lockId, table.getTableName()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index e010255898c9..394568b4e67f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableSet; import io.airlift.log.Logger; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HiveSchemaProperties; @@ -29,9 +30,12 @@ import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.iceberg.IcebergTableName; import io.trino.plugin.iceberg.UnknownTableTypeException; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -53,7 +57,9 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.Transaction; +import org.apache.iceberg.exceptions.NotFoundException; import java.io.IOException; import java.util.Iterator; @@ -68,6 +74,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; @@ -86,13 +93,17 @@ import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; +import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT; import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.toHiveColumns; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; @@ -122,6 +133,7 @@ public class TrinoHiveCatalog private final TrinoFileSystemFactory fileSystemFactory; private final boolean isUsingSystemSecurity; private final boolean deleteSchemaLocationsFallback; + private final boolean hideMaterializedViewStorageTable; private final Map tableMetadataCache = new ConcurrentHashMap<>(); @@ -134,14 +146,16 @@ public TrinoHiveCatalog( IcebergTableOperationsProvider tableOperationsProvider, boolean useUniqueTableLocation, boolean isUsingSystemSecurity, - boolean deleteSchemaLocationsFallback) + boolean deleteSchemaLocationsFallback, + boolean hideMaterializedViewStorageTable) { - super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation); + super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation); this.metastore = requireNonNull(metastore, "metastore is null"); this.trinoViewHiveMetastore = requireNonNull(trinoViewHiveMetastore, "trinoViewHiveMetastore is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.isUsingSystemSecurity = isUsingSystemSecurity; this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; + this.hideMaterializedViewStorageTable = hideMaterializedViewStorageTable; } public CachingHiveMetastore getMetastore() @@ -539,6 +553,45 @@ public void createMaterializedView( } } + if (hideMaterializedViewStorageTable) { + Location storageMetadataLocation = createMaterializedViewStorage(session, viewName, definition); + + Map viewProperties = createMaterializedViewProperties(session, storageMetadataLocation); + Column dummyColumn = new Column("dummy", HIVE_STRING, Optional.empty(), ImmutableMap.of()); + io.trino.plugin.hive.metastore.Table.Builder tableBuilder = io.trino.plugin.hive.metastore.Table.builder() + .setDatabaseName(viewName.getSchemaName()) + .setTableName(viewName.getTableName()) + .setOwner(isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser())) + .setTableType(VIRTUAL_VIEW.name()) + .setDataColumns(ImmutableList.of(dummyColumn)) + .setPartitionColumns(ImmutableList.of()) + .setParameters(viewProperties) + .withStorage(storage -> storage.setStorageFormat(VIEW_STORAGE_FORMAT)) + .withStorage(storage -> storage.setLocation("")) + .setViewOriginalText(Optional.of( + encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)))) + .setViewExpandedText(Optional.of("/* " + ICEBERG_MATERIALIZED_VIEW_COMMENT + " */")); + io.trino.plugin.hive.metastore.Table table = tableBuilder.build(); + PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser()); + + if (existing.isPresent()) { + metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges); + } + else { + metastore.createTable(table, principalPrivileges); + } + } + else { + createMaterializedViewWithStorageTable(session, viewName, definition, existing); + } + } + + private void createMaterializedViewWithStorageTable( + ConnectorSession session, + SchemaTableName viewName, + ConnectorMaterializedViewDefinition definition, + Optional existing) + { SchemaTableName storageTable = createMaterializedViewStorageTable(session, viewName, definition); // Create a view indicating the storage table @@ -639,6 +692,20 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN log.warn(e, "Failed to drop storage table '%s.%s' for materialized view '%s'", storageSchema, storageTableName, viewName); } } + + String storageMetadataLocation = view.getParameters().get(METADATA_LOCATION_PROP); + checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + viewName); + + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TableMetadata metadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation); + String storageLocation = metadata.location(); + try { + fileSystem.deleteDirectory(Location.of(storageLocation)); + } + catch (IOException e) { + log.warn(e, "Failed to delete storage location '%s' for materialized view '%s'", storageLocation, viewName); + } + metastore.dropTable(viewName.getSchemaName(), viewName.getTableName(), true); } @@ -650,38 +717,116 @@ protected Optional doGetMaterializedView(Co return Optional.empty(); } - io.trino.plugin.hive.metastore.Table table = tableOptional.get(); - if (!isTrinoMaterializedView(table.getTableType(), table.getParameters())) { + io.trino.plugin.hive.metastore.Table materializedView = tableOptional.get(); + if (!isTrinoMaterializedView(materializedView.getTableType(), materializedView.getParameters())) { return Optional.empty(); } - io.trino.plugin.hive.metastore.Table materializedView = tableOptional.get(); String storageTable = materializedView.getParameters().get(STORAGE_TABLE); - checkState(storageTable != null, "Storage table missing in definition of materialized view " + viewName); - String storageSchema = Optional.ofNullable(materializedView.getParameters().get(STORAGE_SCHEMA)) - .orElse(viewName.getSchemaName()); - SchemaTableName storageTableName = new SchemaTableName(storageSchema, storageTable); + String storageMetadataLocation = materializedView.getParameters().get(METADATA_LOCATION_PROP); + if ((storageTable == null) == (storageMetadataLocation == null)) { + throw new TrinoException(ICEBERG_BAD_DATA, "Materialized view should have exactly one of the %s properties set: %s".formatted( + ImmutableList.of(STORAGE_TABLE, METADATA_LOCATION_PROP), + materializedView.getParameters())); + } + + if (storageTable != null) { + String storageSchema = Optional.ofNullable(materializedView.getParameters().get(STORAGE_SCHEMA)) + .orElse(viewName.getSchemaName()); + SchemaTableName storageTableName = new SchemaTableName(storageSchema, storageTable); + + Table icebergTable; + try { + icebergTable = loadTable(session, storageTableName); + } + catch (RuntimeException e) { + // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. + // - io.trino.spi.connector.TableNotFoundException + // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file + // - other failures when reading storage table's metadata files + // Retry, as we're catching broadly. + metastore.invalidateTable(viewName.getSchemaName(), viewName.getTableName()); + metastore.invalidateTable(storageSchema, storageTable); + throw new MaterializedViewMayBeBeingRemovedException(e); + } + return Optional.of(getMaterializedViewDefinition( + icebergTable, + materializedView.getOwner(), + materializedView.getViewOriginalText() + .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + viewName)), + storageTableName)); + } - Table icebergTable; + SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), IcebergTableName.tableNameWithType(viewName.getTableName(), MATERIALIZED_VIEW_STORAGE)); + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + storageTableName.getSchemaName(), + storageTableName.getTableName(), + Optional.empty(), + Optional.empty()); try { - icebergTable = loadTable(session, storageTableName); + TableMetadata metadata = getMaterializedViewTableMetadata(session, storageTableName, materializedView); + operations.initializeFromMetadata(metadata); + Table icebergTable = new BaseTable(operations, quotedTableName(storageTableName), TRINO_METRICS_REPORTER); + + return Optional.of(getMaterializedViewDefinition( + icebergTable, + materializedView.getOwner(), + materializedView.getViewOriginalText() + .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + viewName)), + storageTableName)); } catch (RuntimeException e) { // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. - // - io.trino.spi.connector.TableNotFoundException // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file // - other failures when reading storage table's metadata files // Retry, as we're catching broadly. metastore.invalidateTable(viewName.getSchemaName(), viewName.getTableName()); - metastore.invalidateTable(storageSchema, storageTable); throw new MaterializedViewMayBeBeingRemovedException(e); } - return Optional.of(getMaterializedViewDefinition( - icebergTable, - table.getOwner(), - materializedView.getViewOriginalText() - .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + viewName)), - storageTableName)); + } + + @Override + public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) + { + Optional tableOptional = metastore.getTable(viewName.getSchemaName(), viewName.getTableName()); + if (tableOptional.isEmpty()) { + return Optional.empty(); + } + + io.trino.plugin.hive.metastore.Table materializedView = tableOptional.get(); + verify(isTrinoMaterializedView(materializedView.getTableType(), materializedView.getParameters()), + "getMaterializedViewStorageTable received a table, not a materialized view"); + + SchemaTableName storageTableName = new SchemaTableName(viewName.getSchemaName(), IcebergTableName.tableNameWithType(viewName.getTableName(), MATERIALIZED_VIEW_STORAGE)); + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + storageTableName.getSchemaName(), + storageTableName.getTableName(), + Optional.empty(), + Optional.empty()); + + try { + TableMetadata metadata = getMaterializedViewTableMetadata(session, storageTableName, materializedView); + operations.initializeFromMetadata(metadata); + return Optional.of(new BaseTable(operations, quotedTableName(storageTableName), TRINO_METRICS_REPORTER)); + } + catch (NotFoundException e) { + // Removed during reading + return Optional.empty(); + } + } + + private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, SchemaTableName storageTableName, io.trino.plugin.hive.metastore.Table materializedView) + { + return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + String storageMetadataLocation = materializedView.getParameters().get(METADATA_LOCATION_PROP); + checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + materializedView.getTableName()); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation); + }); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java index ae5c8f01857e..252f0cccb087 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java @@ -47,6 +47,7 @@ public class TrinoHiveCatalogFactory private final boolean isUniqueTableLocation; private final boolean isUsingSystemSecurity; private final boolean deleteSchemaLocationsFallback; + private final boolean hideMaterializedViewStorageTable; @Inject public TrinoHiveCatalogFactory( @@ -68,6 +69,7 @@ public TrinoHiveCatalogFactory( this.isUniqueTableLocation = config.isUniqueTableLocation(); this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM; this.deleteSchemaLocationsFallback = config.isDeleteSchemaLocationsFallback(); + this.hideMaterializedViewStorageTable = config.isHideMaterializedViewStorageTable(); } @Override @@ -83,6 +85,7 @@ public TrinoCatalog create(ConnectorIdentity identity) tableOperationsProvider, isUniqueTableLocation, isUsingSystemSecurity, - deleteSchemaLocationsFallback); + deleteSchemaLocationsFallback, + hideMaterializedViewStorageTable); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java index 1b7e3bda1afa..458435828543 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java @@ -67,4 +67,10 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) jdbcClient.alterTable(database, tableName, newMetadataLocation, currentMetadataLocation); shouldRefresh = true; } + + @Override + protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata) + { + throw new UnsupportedOperationException(); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 5d04d469baba..52af89f0fbe0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -91,7 +91,7 @@ public TrinoJdbcCatalog( boolean useUniqueTableLocation, String defaultWarehouseDir) { - super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation); + super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation); this.jdbcCatalog = requireNonNull(jdbcCatalog, "jdbcCatalog is null"); this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); @@ -405,6 +405,12 @@ protected Optional doGetMaterializedView(Co return Optional.empty(); } + @Override + public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "The Iceberg JDBC catalog does not support materialized views"); + } + @Override public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java index 75be83f8293d..602e526a213e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java @@ -126,6 +126,12 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) shouldRefresh = true; } + @Override + protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata) + { + throw new UnsupportedOperationException(); + } + private static ContentKey toKey(SchemaTableName tableName) { return ContentKey.of(Namespace.parse(tableName.getSchemaName()), tableName.getTableName()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 7de7de1c58d1..6d5badaa8380 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -81,7 +81,7 @@ public TrinoNessieCatalog( String warehouseLocation, boolean useUniqueTableLocation) { - super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation); + super(catalogName, typeManager, tableOperationsProvider, fileSystemFactory, useUniqueTableLocation); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.warehouseLocation = requireNonNull(warehouseLocation, "warehouseLocation is null"); this.nessieClient = requireNonNull(nessieClient, "nessieClient is null"); @@ -396,6 +396,12 @@ public Optional getMaterializedView(Connect return Optional.empty(); } + @Override + public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "The Iceberg Nessie catalog does not support materialized views"); + } + @Override protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index a2083f33e362..74927abd75e9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -435,6 +435,12 @@ public Optional getMaterializedView(Connect return Optional.empty(); } + @Override + public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "The Iceberg REST catalog does not support materialized views"); + } + @Override public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java index 03dcb9d109d9..96b2be40424c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java @@ -57,7 +57,7 @@ public InputFile newInputFile(String path, long length) @Override public OutputFile newOutputFile(String path) { - return new ForwardingOutputFile(fileSystem, path); + return new ForwardingOutputFile(fileSystem, Location.of(path)); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java index 40a65d5b7a36..8d0f956688b7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingOutputFile.java @@ -33,10 +33,10 @@ public class ForwardingOutputFile private final TrinoFileSystem fileSystem; private final TrinoOutputFile outputFile; - public ForwardingOutputFile(TrinoFileSystem fileSystem, String path) + public ForwardingOutputFile(TrinoFileSystem fileSystem, Location location) { this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); - this.outputFile = fileSystem.newOutputFile(Location.of(path)); + this.outputFile = fileSystem.newOutputFile(location); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 90d403d2b9bc..fa1acfe613d9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6701,30 +6701,6 @@ public void testSnapshotSummariesHaveTrinoQueryIdFormatV2() "WHEN MATCHED THEN UPDATE SET b = t.b * 50", tableName, sourceTableName))); } - @Test - public void testMaterializedViewSnapshotSummariesHaveTrinoQueryId() - { - String matViewName = "test_materialized_view_snapshot_query_ids" + randomNameSuffix(); - String sourceTableName = "test_source_table_for_mat_view" + randomNameSuffix(); - assertUpdate(format("CREATE TABLE %s (a bigint, b bigint)", sourceTableName)); - assertUpdate(format("INSERT INTO %s VALUES (1, 1), (1, 4), (2, 2)", sourceTableName), 3); - - // create a materialized view - QueryId matViewCreateQueryId = getDistributedQueryRunner() - .executeWithQueryId(getSession(), format("CREATE MATERIALIZED VIEW %s WITH (partitioning = ARRAY['a']) AS SELECT * FROM %s", matViewName, sourceTableName)) - .getQueryId(); - - // fetch the underlying storage table name so we can inspect its snapshot summary after the REFRESH - // running queries against "materialized_view$snapshots" is not supported - String storageTable = (String) getDistributedQueryRunner() - .execute(getSession(), format("SELECT storage_table FROM system.metadata.materialized_views WHERE name = '%s'", matViewName)) - .getOnlyValue(); - - assertQueryIdStored(storageTable, matViewCreateQueryId); - - assertQueryIdStored(storageTable, executeWithQueryId(format("REFRESH MATERIALIZED VIEW %s", matViewName))); - } - @Override protected OptionalInt maxTableNameLength() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java index 73eaa253a4ac..7c0d00556143 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java @@ -15,32 +15,32 @@ import com.google.common.collect.ImmutableSet; import io.trino.Session; -import io.trino.metadata.MaterializedViewDefinition; -import io.trino.metadata.QualifiedObjectName; -import io.trino.spi.connector.SchemaTableName; +import io.trino.filesystem.Location; +import io.trino.filesystem.local.LocalFileSystem; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; +import io.trino.spi.QueryId; import io.trino.sql.tree.ExplainType; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedRow; -import io.trino.transaction.TransactionId; -import io.trino.transaction.TransactionManager; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.assertj.core.api.Condition; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.nio.file.Path; import java.util.Optional; import java.util.Set; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.SystemSessionProperties.LEGACY_MATERIALIZED_VIEW_GRACE_PERIOD; import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION; -import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DELETE_TABLE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_MATERIALIZED_VIEW; -import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.INSERT_TABLE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.REFRESH_MATERIALIZED_VIEW; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.RENAME_MATERIALIZED_VIEW; -import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN; -import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.UPDATE_TABLE; import static io.trino.testing.TestingAccessControlManager.privilege; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; @@ -52,10 +52,10 @@ public abstract class BaseIcebergMaterializedViewTest extends AbstractTestQueryFramework { - protected final String storageSchemaName = "testing_storage_schema_" + randomNameSuffix(); - protected abstract String getSchemaDirectory(); + protected abstract String getStorageMetadataLocation(String materializedViewName); + @BeforeClass public void setUp() { @@ -65,17 +65,13 @@ public void setUp() assertUpdate("CREATE TABLE base_table2 (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_bigint', '_date'])"); assertUpdate("INSERT INTO base_table2 VALUES ('a', 0, DATE '2019-09-08'), ('a', 1, DATE '2019-09-08'), ('a', 0, DATE '2019-09-09')", 3); - - assertUpdate("CREATE SCHEMA " + storageSchemaName); } @Test public void testShowTables() { assertUpdate("CREATE MATERIALIZED VIEW materialized_view_show_tables_test AS SELECT * FROM base_table1"); - SchemaTableName storageTableName = getStorageTable("materialized_view_show_tables_test"); - - Set expectedTables = ImmutableSet.of("base_table1", "base_table2", "materialized_view_show_tables_test", storageTableName.getTableName()); + Set expectedTables = ImmutableSet.of("base_table1", "base_table2", "materialized_view_show_tables_test"); Set actualTables = computeActual("SHOW TABLES").getOnlyColumnAsSet().stream() .map(String.class::cast) .collect(toImmutableSet()); @@ -107,20 +103,6 @@ public void testMaterializedViewsMetadata() computeActual("CREATE TABLE small_region AS SELECT * FROM tpch.tiny.region LIMIT 1"); computeActual(format("CREATE MATERIALIZED VIEW %s AS SELECT * FROM small_region LIMIT 1", materializedViewName)); - // test storage table name - assertQuery( - format( - "SELECT storage_catalog, storage_schema, CONCAT(storage_schema, '.', storage_table)" + - "FROM system.metadata.materialized_views WHERE schema_name = '%s' AND name = '%s'", - // TODO (https://github.com/trinodb/trino/issues/9039) remove redundant schema_name filter - schemaName, - materializedViewName), - format( - "VALUES ('%s', '%s', '%s')", - catalogName, - schemaName, - getStorageTable(catalogName, schemaName, materializedViewName))); - // test freshness update assertQuery( // TODO (https://github.com/trinodb/trino/issues/9039) remove redundant schema_name filter @@ -182,7 +164,7 @@ public void testShowCreate() "WITH (\n" + " format = 'ORC',\n" + " format_version = 2,\n" + - " location = '" + getSchemaDirectory() + "/st_\\E[0-9a-f]+-[0-9a-f]+\\Q',\n" + + " location = '" + getSchemaDirectory() + "/test_mv_show_create-\\E[0-9a-f]+\\Q',\n" + " orc_bloom_filter_columns = ARRAY['_date'],\n" + " orc_bloom_filter_fpp = 1E-1,\n" + " partitioning = ARRAY['_date'],\n" + @@ -269,22 +251,6 @@ public void testRefreshDenyPermission() assertUpdate("DROP MATERIALIZED VIEW materialized_view_refresh_deny"); } - @Test - public void testRefreshAllowedWithRestrictedStorageTable() - { - assertUpdate("CREATE MATERIALIZED VIEW materialized_view_refresh AS SELECT * FROM base_table1"); - SchemaTableName storageTable = getStorageTable("materialized_view_refresh"); - - assertAccessAllowed( - "REFRESH MATERIALIZED VIEW materialized_view_refresh", - privilege(storageTable.getTableName(), INSERT_TABLE), - privilege(storageTable.getTableName(), DELETE_TABLE), - privilege(storageTable.getTableName(), UPDATE_TABLE), - privilege(storageTable.getTableName(), SELECT_COLUMN)); - - assertUpdate("DROP MATERIALIZED VIEW materialized_view_refresh"); - } - @Test public void testCreateRefreshSelect() { @@ -524,7 +490,7 @@ public void testSqlFeatures() "WITH (\n" + " format = 'PARQUET',\n" + " format_version = 2,\n" + - " location = '" + getSchemaDirectory() + "/st_\\E[0-9a-f]+-[0-9a-f]+\\Q',\n" + + " location = '" + getSchemaDirectory() + "/materialized_view_window-\\E[0-9a-f]+\\Q',\n" + " partitioning = ARRAY['_date'],\n" + " storage_schema = '" + schema + "'\n" + ") AS\n" + @@ -632,47 +598,6 @@ public void testNestedMaterializedViews() assertUpdate("DROP MATERIALIZED VIEW materialized_view_level2"); } - @Test - public void testStorageSchemaProperty() - { - String schemaName = getSession().getSchema().orElseThrow(); - String viewName = "storage_schema_property_test"; - assertUpdate( - "CREATE MATERIALIZED VIEW " + viewName + " " + - "WITH (storage_schema = '" + storageSchemaName + "') AS " + - "SELECT * FROM base_table1"); - SchemaTableName storageTable = getStorageTable(viewName); - assertThat(storageTable.getSchemaName()).isEqualTo(storageSchemaName); - - assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 6); - assertThat(computeActual("SELECT * FROM " + viewName).getRowCount()).isEqualTo(6); - assertThat(getExplainPlan("SELECT * FROM " + viewName, ExplainType.Type.IO)) - .doesNotContain("base_table1") - .contains(storageSchemaName); - - assertThat((String) computeScalar("SHOW CREATE MATERIALIZED VIEW " + viewName)) - .contains("storage_schema = '" + storageSchemaName + "'"); - - Set storageSchemaTables = computeActual("SHOW TABLES IN " + storageSchemaName).getOnlyColumnAsSet().stream() - .map(String.class::cast) - .collect(toImmutableSet()); - assertThat(storageSchemaTables).contains(storageTable.getTableName()); - - assertUpdate("DROP MATERIALIZED VIEW " + viewName); - storageSchemaTables = computeActual("SHOW TABLES IN " + storageSchemaName).getOnlyColumnAsSet().stream() - .map(String.class::cast) - .collect(toImmutableSet()); - assertThat(storageSchemaTables).doesNotContain(storageTable.getTableName()); - - assertThatThrownBy(() -> query( - "CREATE MATERIALIZED VIEW " + viewName + " " + - "WITH (storage_schema = 'non_existent') AS " + - "SELECT * FROM base_table1")) - .hasMessageContaining("non_existent not found"); - assertThatThrownBy(() -> query("DESCRIBE " + viewName)) - .hasMessageContaining(format("'iceberg.%s.%s' does not exist", schemaName, viewName)); - } - @Test(dataProvider = "testBucketPartitioningDataProvider") public void testBucketPartitioning(String dataType, String exampleValue) { @@ -683,9 +608,11 @@ public void testBucketPartitioning(String dataType, String exampleValue) assertUpdate("CREATE MATERIALIZED VIEW test_bucket_partitioning WITH (partitioning=ARRAY['bucket(col, 4)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)" .formatted(dataType, exampleValue)); try { - SchemaTableName storageTable = getStorageTable("test_bucket_partitioning"); - assertThat((String) computeScalar("SHOW CREATE TABLE " + storageTable)) - .contains("partitioning = ARRAY['bucket(col, 4)']"); + TableMetadata storageMetadata = getStorageTableMetadata( "test_bucket_partitioning"); + assertThat(storageMetadata.spec().fields()).hasSize(1); + PartitionField bucketPartitionField = getOnlyElement(storageMetadata.spec().fields()); + assertThat(bucketPartitionField.name()).isEqualTo("col_bucket"); + assertThat(bucketPartitionField.transform().toString()).isEqualTo("bucket[4]"); assertThat(query("SELECT * FROM test_bucket_partitioning WHERE col = " + exampleValue)) .matches("SELECT " + exampleValue); @@ -724,9 +651,11 @@ public void testTruncatePartitioning(String dataType, String exampleValue) assertUpdate("CREATE MATERIALIZED VIEW test_truncate_partitioning WITH (partitioning=ARRAY['truncate(col, 4)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)" .formatted(dataType, exampleValue)); try { - SchemaTableName storageTable = getStorageTable("test_truncate_partitioning"); - assertThat((String) computeScalar("SHOW CREATE TABLE " + storageTable)) - .contains("partitioning = ARRAY['truncate(col, 4)']"); + TableMetadata storageMetadata = getStorageTableMetadata("test_truncate_partitioning"); + assertThat(storageMetadata.spec().fields()).hasSize(1); + PartitionField bucketPartitionField = getOnlyElement(storageMetadata.spec().fields()); + assertThat(bucketPartitionField.name()).isEqualTo("col_trunc"); + assertThat(bucketPartitionField.transform().toString()).isEqualTo("truncate[4]"); assertThat(query("SELECT * FROM test_truncate_partitioning WHERE col = " + exampleValue)) .matches("SELECT " + exampleValue); @@ -759,9 +688,11 @@ public void testTemporalPartitioning(String partitioning, String dataType, Strin assertUpdate("CREATE MATERIALIZED VIEW test_temporal_partitioning WITH (partitioning=ARRAY['%s(col)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)" .formatted(partitioning, dataType, exampleValue)); try { - SchemaTableName storageTable = getStorageTable("test_temporal_partitioning"); - assertThat((String) computeScalar("SHOW CREATE TABLE " + storageTable)) - .contains("partitioning = ARRAY['%s(col)']".formatted(partitioning)); + TableMetadata storageMetadata = getStorageTableMetadata("test_temporal_partitioning"); + assertThat(storageMetadata.spec().fields()).hasSize(1); + PartitionField bucketPartitionField = getOnlyElement(storageMetadata.spec().fields()); + assertThat(bucketPartitionField.name()).isEqualTo("col_" + partitioning); + assertThat(bucketPartitionField.transform().toString()).isEqualTo(partitioning); assertThat(query("SELECT * FROM test_temporal_partitioning WHERE col = " + exampleValue)) .matches("SELECT " + exampleValue); @@ -789,25 +720,40 @@ public Object[][] testTemporalPartitioningDataProvider() }; } - protected String getColumnComment(String tableName, String columnName) + @Test + public void testMaterializedViewSnapshotSummariesHaveTrinoQueryId() { - return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); + String materializedViewName = "test_materialized_view_snapshot_query_ids" + randomNameSuffix(); + String sourceTableName = "test_source_table_for_mat_view" + randomNameSuffix(); + assertUpdate(format("CREATE TABLE %s (a bigint, b bigint)", sourceTableName)); + QueryId matViewCreateQueryId = getDistributedQueryRunner() + .executeWithQueryId(getSession(), format("CREATE MATERIALIZED VIEW %s WITH (partitioning = ARRAY['a']) AS SELECT * FROM %s", materializedViewName, sourceTableName)) + .getQueryId(); + + try { + assertUpdate(format("INSERT INTO %s VALUES (1, 1), (1, 4), (2, 2)", sourceTableName), 3); + + QueryId refreshQueryId = getDistributedQueryRunner() + .executeWithQueryId(getSession(), format("REFRESH MATERIALIZED VIEW %s", materializedViewName)) + .getQueryId(); + String savedQueryId = getStorageTableMetadata(materializedViewName).currentSnapshot().summary().get("trino_query_id"); + assertThat(savedQueryId).isEqualTo(refreshQueryId.getId()); + } + finally { + assertUpdate("DROP TABLE " + sourceTableName); + assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName); + } } - private SchemaTableName getStorageTable(String materializedViewName) + protected String getColumnComment(String tableName, String columnName) { - return getStorageTable(getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), materializedViewName); + return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); } - private SchemaTableName getStorageTable(String catalogName, String schemaName, String materializedViewName) + private TableMetadata getStorageTableMetadata(String materializedViewName) { - TransactionManager transactionManager = getQueryRunner().getTransactionManager(); - TransactionId transactionId = transactionManager.beginTransaction(false); - Session session = getSession().beginTransactionId(transactionId, transactionManager, getQueryRunner().getAccessControl()); - Optional materializedView = getQueryRunner().getMetadata() - .getMaterializedView(session, new QualifiedObjectName(catalogName, schemaName, materializedViewName)); - assertThat(materializedView).isPresent(); - return materializedView.get().getStorageTable().get().getSchemaTableName(); + Location metadataLocation = Location.of(getStorageMetadataLocation(materializedViewName)); + return TableMetadataParser.read(new ForwardingFileIo(new LocalFileSystem(Path.of(metadataLocation.parentDirectory().toString()))), "local:///" + metadataLocation); } private long getLatestSnapshotId(String tableName) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 5cfcb285f73a..89a5d16e14f4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -17,6 +17,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; +import jakarta.validation.constraints.AssertFalse; import org.junit.jupiter.api.Test; import java.util.Map; @@ -24,6 +25,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.testing.ValidationAssertions.assertFailsValidation; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD; @@ -59,6 +61,7 @@ public void testDefaults() .setDeleteSchemaLocationsFallback(false) .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) .setMinimumAssignedSplitWeight(0.05) + .setHideMaterializedViewStorageTable(true) .setMaterializedViewsStorageSchema(null) .setRegisterTableProcedureEnabled(false) .setSortedWritingEnabled(true) @@ -87,6 +90,7 @@ public void testExplicitPropertyMappings() .put("iceberg.delete-schema-locations-fallback", "true") .put("iceberg.target-max-file-size", "1MB") .put("iceberg.minimum-assigned-split-weight", "0.01") + .put("iceberg.materialized-views.hide-storage-table", "false") .put("iceberg.materialized-views.storage-schema", "mv_storage_schema") .put("iceberg.register-table-procedure.enabled", "true") .put("iceberg.sorted-writing-enabled", "false") @@ -112,6 +116,7 @@ public void testExplicitPropertyMappings() .setDeleteSchemaLocationsFallback(true) .setTargetMaxFileSize(DataSize.of(1, MEGABYTE)) .setMinimumAssignedSplitWeight(0.01) + .setHideMaterializedViewStorageTable(false) .setMaterializedViewsStorageSchema("mv_storage_schema") .setRegisterTableProcedureEnabled(true) .setSortedWritingEnabled(false) @@ -119,4 +124,16 @@ public void testExplicitPropertyMappings() assertFullMapping(properties, expected); } + + @Test + public void testValidation() + { + assertFailsValidation( + new IcebergConfig() + .setHideMaterializedViewStorageTable(true) + .setMaterializedViewsStorageSchema("storage_schema"), + "storageSchemaSetWhenHidingIsEnabled", + "iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false", + AssertFalse.class); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedView.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedView.java index 59ba415c1dab..c694902ccd80 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedView.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedView.java @@ -14,26 +14,40 @@ package io.trino.plugin.iceberg; import io.trino.Session; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.Table; import io.trino.sql.tree.ExplainType; import io.trino.testing.DistributedQueryRunner; import org.testng.annotations.Test; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Map; import static io.trino.plugin.base.util.Closables.closeAllSuppress; -import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static io.trino.plugin.hive.metastore.file.TestingFileHiveMetastore.createTestingFileHiveMetastore; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; import static org.assertj.core.api.Assertions.assertThat; public class TestIcebergMaterializedView extends BaseIcebergMaterializedViewTest { private Session secondIceberg; + private String fileMetastoreDirectory; + private HiveMetastore metastore; @Override protected DistributedQueryRunner createQueryRunner() throws Exception { - DistributedQueryRunner queryRunner = createIcebergQueryRunner(); + File metastoreDir = Files.createTempDirectory("test_iceberg_table_smoke_test").toFile(); + metastoreDir.deleteOnExit(); + this.fileMetastoreDirectory = metastoreDir.getAbsolutePath(); + this.metastore = createTestingFileHiveMetastore(metastoreDir); + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .setMetastoreDirectory(metastoreDir) + .build(); try { queryRunner.createCatalog("iceberg2", "iceberg", Map.of( "iceberg.catalog.type", "TESTING_FILE_METASTORE", @@ -56,7 +70,14 @@ protected DistributedQueryRunner createQueryRunner() @Override protected String getSchemaDirectory() { - return getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data/tpch").toString(); + return Path.of(fileMetastoreDirectory, "tpch").toString(); + } + + @Override + protected String getStorageMetadataLocation(String materializedViewName) + { + Table table = metastore.getTable("tpch", materializedViewName).orElseThrow(); + return table.getParameters().get(METADATA_LOCATION_PROP); } @Test @@ -83,7 +104,7 @@ AS SELECT sum(value) AS s FROM iceberg2.tpch.common_base_table // After REFRESH, the MV is fresh assertUpdate(defaultIceberg, "REFRESH MATERIALIZED VIEW mv_on_iceberg2", 1); assertThat(getExplainPlan("TABLE mv_on_iceberg2", ExplainType.Type.IO)) - .contains("\"table\" : \"st_") + .contains("\"table\" : \"mv_on_iceberg2$materialized_view_storage") .doesNotContain("common_base_table"); assertThat(query("TABLE mv_on_iceberg2")) .matches("VALUES BIGINT '10'"); @@ -91,7 +112,7 @@ AS SELECT sum(value) AS s FROM iceberg2.tpch.common_base_table // After INSERT to the base table, the MV is still fresh, because it currently does not detect changes to tables in other catalog. assertUpdate(secondIceberg, "INSERT INTO common_base_table VALUES 7", 1); assertThat(getExplainPlan("TABLE mv_on_iceberg2", ExplainType.Type.IO)) - .contains("\"table\" : \"st_") + .contains("\"table\" : \"mv_on_iceberg2$materialized_view_storage") .doesNotContain("common_base_table"); assertThat(query("TABLE mv_on_iceberg2")) .matches("VALUES BIGINT '10'"); @@ -99,7 +120,7 @@ AS SELECT sum(value) AS s FROM iceberg2.tpch.common_base_table // After REFRESH, the MV is fresh again assertUpdate(defaultIceberg, "REFRESH MATERIALIZED VIEW mv_on_iceberg2", 1); assertThat(getExplainPlan("TABLE mv_on_iceberg2", ExplainType.Type.IO)) - .contains("\"table\" : \"st_") + .contains("\"table\" : \"mv_on_iceberg2$materialized_view_storage") .doesNotContain("common_base_table"); assertThat(query("TABLE mv_on_iceberg2")) .matches("VALUES BIGINT '17'"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java index 2889f355fe63..0997b3768eff 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMergeAppend.java @@ -62,7 +62,8 @@ protected QueryRunner createQueryRunner() tableOperationsProvider, false, false, - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); return queryRunner; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataListing.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataListing.java index 7ec95cbdd7af..d05e92225477 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataListing.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataListing.java @@ -107,7 +107,6 @@ public void testTableListing() "iceberg_table1", "iceberg_table2", "iceberg_materialized_view", - storageTable.getTableName(), "iceberg_view", "hive_table", "hive_view"); @@ -118,7 +117,6 @@ public void testTableListing() "'iceberg_table1', " + "'iceberg_table2', " + "'iceberg_materialized_view', " + - "'" + storageTable.getTableName() + "', " + "'iceberg_view', " + "'hive_table', " + "'hive_view'"); @@ -136,8 +134,6 @@ public void testTableColumnListing() "('iceberg_table2', '_double'), " + "('iceberg_materialized_view', '_string'), " + "('iceberg_materialized_view', '_integer'), " + - "('" + storageTable.getTableName() + "', '_string'), " + - "('" + storageTable.getTableName() + "', '_integer'), " + "('iceberg_view', '_string'), " + "('iceberg_view', '_integer'), " + "('hive_view', '_double')"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 2d46297e5b2b..005a0cfeccce 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -45,6 +45,7 @@ import static io.trino.plugin.iceberg.TableType.FILES; import static io.trino.plugin.iceberg.TableType.HISTORY; import static io.trino.plugin.iceberg.TableType.MANIFESTS; +import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; import static io.trino.plugin.iceberg.TableType.REFS; @@ -223,7 +224,7 @@ public void testSelectFromMaterializedView() assertMetastoreInvocations("SELECT * FROM test_select_mview_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 2) .build()); } @@ -235,7 +236,7 @@ public void testSelectFromMaterializedViewWithFilter() assertMetastoreInvocations("SELECT * FROM test_select_mview_where_view WHERE age = 2", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 2) .build()); } @@ -247,7 +248,7 @@ public void testRefreshMaterializedView() assertMetastoreInvocations("REFRESH MATERIALIZED VIEW test_refresh_mview_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 6) + .addCopies(GET_TABLE, 2) .addCopies(REPLACE_TABLE, 1) .build()); } @@ -349,9 +350,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + assertQueryFails("SELECT * FROM \"test_select_snapshots$materialized_view_storage\"", + "Table 'test_schema.test_select_snapshots\\$materialized_view_storage' not found"); + // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS); + .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } @Test @@ -491,33 +495,33 @@ public void testSystemMetadataMaterializedViews() assertMetastoreInvocations(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA", ImmutableMultiset.builder() .add(GET_TABLES_WITH_PARAMETER) - .addCopies(GET_TABLE, 6) + .addCopies(GET_TABLE, 4) .build()); // Bulk retrieval without selecting freshness assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA", ImmutableMultiset.builder() .add(GET_TABLES_WITH_PARAMETER) - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 2) .build()); // Bulk retrieval for two schemas assertMetastoreInvocations(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name IN (CURRENT_SCHEMA, 'non_existent')", ImmutableMultiset.builder() .addCopies(GET_TABLES_WITH_PARAMETER, 2) - .addCopies(GET_TABLE, 6) + .addCopies(GET_TABLE, 4) .build()); // Pointed lookup assertMetastoreInvocations(session, "SELECT * FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 3) .build()); // Pointed lookup without selecting freshness assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .add(GET_TABLE) .build()); assertUpdate("DROP SCHEMA " + schemaName + " CASCADE"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java index 5c7a6e77c155..c19c1349713c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergOrcMetricsCollection.java @@ -98,7 +98,8 @@ protected QueryRunner createQueryRunner() tableOperationsProvider, false, false, - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 10aa9f8e3553..2e7812aacda5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -122,7 +122,8 @@ protected QueryRunner createQueryRunner() new FileMetastoreTableOperationsProvider(fileSystemFactory), false, false, - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); return queryRunner; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index afd14a0ef64b..f8433034db31 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -1036,7 +1036,8 @@ private BaseTable loadTable(String tableName) tableOperationsProvider, false, false, - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); return (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestTrinoHiveCatalogWithFileMetastore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestTrinoHiveCatalogWithFileMetastore.java index daf79a1af9ad..5a703b2ce2d6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestTrinoHiveCatalogWithFileMetastore.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestTrinoHiveCatalogWithFileMetastore.java @@ -18,6 +18,7 @@ import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; +import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; @@ -75,6 +76,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) new FileMetastoreTableOperationsProvider(fileSystemFactory), useUniqueTableLocations, false, - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index b65004be19fa..e29fb2454c73 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -52,6 +52,7 @@ import static io.trino.plugin.iceberg.TableType.FILES; import static io.trino.plugin.iceberg.TableType.HISTORY; import static io.trino.plugin.iceberg.TableType.MANIFESTS; +import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; import static io.trino.plugin.iceberg.TableType.REFS; @@ -265,7 +266,7 @@ public void testSelectFromMaterializedView() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_mview_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 2) .build()); } finally { @@ -283,7 +284,7 @@ public void testSelectFromMaterializedViewWithFilter() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_mview_where_view WHERE age = 2", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 2) .build()); } finally { @@ -301,7 +302,7 @@ public void testRefreshMaterializedView() assertGlueMetastoreApiInvocations("REFRESH MATERIALIZED VIEW test_refresh_mview_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 5) + .addCopies(GET_TABLE, 4) .addCopies(UPDATE_TABLE, 1) .build()); } @@ -441,9 +442,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + assertQueryFails("SELECT * FROM \"test_select_snapshots$materialized_view_storage\"", + "Table '" + testSchema + ".test_select_snapshots\\$materialized_view_storage' not found"); + // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS); + .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } finally { getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedView.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedView.java index 95b1d315d7ef..f3d54ce2deed 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedView.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedView.java @@ -17,6 +17,7 @@ import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; import com.amazonaws.services.glue.model.BatchDeleteTableRequest; import com.amazonaws.services.glue.model.DeleteDatabaseRequest; +import com.amazonaws.services.glue.model.GetTableRequest; import com.amazonaws.services.glue.model.GetTablesRequest; import com.amazonaws.services.glue.model.GetTablesResult; import com.amazonaws.services.glue.model.Table; @@ -36,7 +37,9 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.getTableParameters; import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; public class TestIcebergGlueCatalogMaterializedView extends BaseIcebergMaterializedViewTest @@ -71,11 +74,21 @@ protected String getSchemaDirectory() return new File(schemaDirectory, schemaName + ".db").getPath(); } + @Override + protected String getStorageMetadataLocation(String materializedViewName) + { + AWSGlueAsync glueClient = AWSGlueAsyncClientBuilder.defaultClient(); + Table table = glueClient.getTable(new GetTableRequest() + .withDatabaseName(schemaName) + .withName(materializedViewName)) + .getTable(); + return getTableParameters(table).get(METADATA_LOCATION_PROP); + } + @AfterClass(alwaysRun = true) public void cleanup() { cleanUpSchema(schemaName); - cleanUpSchema(storageSchemaName); } private static void cleanUpSchema(String schema) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java index f69cef509c6b..c44dc03ee138 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java @@ -25,6 +25,7 @@ import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; import io.trino.plugin.iceberg.CommitTaskData; +import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergMetadata; import io.trino.plugin.iceberg.TableStatisticsWriter; import io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest; @@ -79,7 +80,8 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) glueClient, new GlueMetastoreStats(), Optional.empty(), - useUniqueTableLocations); + useUniqueTableLocations, + new IcebergConfig().isHideMaterializedViewStorageTable()); } /** @@ -157,7 +159,8 @@ public void testDefaultLocation() glueClient, new GlueMetastoreStats(), Optional.of(tmpDirectory.toAbsolutePath().toString()), - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); String namespace = "test_default_location_" + randomNameSuffix(); String table = "tableName"; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java index c5e1454dca6b..05b4331231ed 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/hms/TestTrinoHiveCatalogWithHiveMetastore.java @@ -33,6 +33,7 @@ import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreFactory; +import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergSchemaProperties; import io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest; import io.trino.plugin.iceberg.catalog.TrinoCatalog; @@ -129,7 +130,8 @@ public ThriftMetastore createMetastore(Optional identity) }), useUniqueTableLocations, false, - false); + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); } @Override diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveMetadataListing.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveMetadataListing.java index 7564761a75a0..f2e4c4e6c87e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveMetadataListing.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergHiveMetadataListing.java @@ -22,7 +22,6 @@ import java.util.List; -import static com.google.common.collect.Iterators.getOnlyElement; import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tests.product.TestGroups.HMS_ONLY; import static io.trino.tests.product.TestGroups.ICEBERG; @@ -34,7 +33,6 @@ public class TestIcebergHiveMetadataListing extends ProductTest { - private String storageTable; private List preexistingTables; private List preexistingColumns; @@ -55,11 +53,6 @@ public void setUp() onTrino().executeQuery("CREATE TABLE iceberg.default.iceberg_table1 (_string VARCHAR, _integer INTEGER)"); onTrino().executeQuery("CREATE MATERIALIZED VIEW iceberg.default.iceberg_materialized_view AS " + "SELECT * FROM iceberg.default.iceberg_table1"); - storageTable = getOnlyElement(onTrino().executeQuery("SHOW TABLES FROM iceberg.default") - .column(1).stream() - .map(String.class::cast) - .filter(name -> name.startsWith("st_")) - .iterator()); onTrino().executeQuery("CREATE TABLE hive.default.hive_table (_double DOUBLE)"); onTrino().executeQuery("CREATE VIEW hive.default.hive_view AS SELECT * FROM hive.default.hive_table"); @@ -84,7 +77,6 @@ public void testTableListing() .addAll(preexistingTables) .add(row("iceberg_table1")) .add(row("iceberg_materialized_view")) - .add(row(storageTable)) .add(row("iceberg_view")) .add(row("hive_table")) // Iceberg connector supports Trino views created via Hive connector @@ -104,8 +96,6 @@ public void testColumnListing() .add(row("iceberg_table1", "_integer")) .add(row("iceberg_materialized_view", "_string")) .add(row("iceberg_materialized_view", "_integer")) - .add(row(storageTable, "_string")) - .add(row(storageTable, "_integer")) .add(row("iceberg_view", "_string")) .add(row("iceberg_view", "_integer")) .add(row("hive_view", "_double"))