diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index abb0dd35ec0b..b39fb07ead1e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -32,7 +32,6 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.base.CatalogName; import io.trino.plugin.base.projection.ApplyProjectionUtil; @@ -127,7 +126,6 @@ import io.trino.spi.type.VarcharType; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; -import org.apache.hadoop.fs.Path; import java.io.FileNotFoundException; import java.io.IOException; @@ -1833,7 +1831,7 @@ public Optional finishCreateTable(ConnectorSession sess tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()); } - Optional writePath = Optional.of(new Path(writeInfo.writePath().toString())); + Optional writePath = Optional.of(writeInfo.writePath()); if (handle.getPartitionedBy().isEmpty()) { List fileNames; if (partitionUpdates.isEmpty()) { @@ -2208,7 +2206,7 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc session, table, principalPrivileges, - Optional.of(new Path(partitionUpdate.getWritePath().toString())), + Optional.of(partitionUpdate.getWritePath()), Optional.of(partitionUpdate.getFileNames()), false, partitionStatistics, @@ -2268,8 +2266,8 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) { removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath()); if (handle.isRetriesEnabled()) { - HdfsContext hdfsContext = new HdfsContext(session); - cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames())); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + cleanExtraOutputFiles(fileSystem, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames())); } } else { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java index 991fdd36310b..278430f1b3c3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadataFactory.java @@ -238,7 +238,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister); SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore( - hdfsEnvironment, + fileSystemFactory, hiveMetastoreClosure, fileSystemExecutor, dropExecutor, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 2d3707773dac..3a1ff3f923b1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -23,11 +23,15 @@ import com.google.common.collect.Lists; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.concurrent.GuardedBy; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.airlift.units.Duration; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; -import io.trino.hdfs.HdfsContext; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.hive.thrift.metastore.DataOperationType; import io.trino.plugin.hive.HiveBasicStatistics; import io.trino.plugin.hive.HiveColumnStatisticType; @@ -45,7 +49,6 @@ import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege; import io.trino.plugin.hive.security.SqlStandardAccessControlMetadataMetastore; -import io.trino.plugin.hive.util.RetryDriver; import io.trino.plugin.hive.util.ValidTxnWriteIdList; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -53,14 +56,10 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.security.PrincipalType; import io.trino.spi.security.RoleGrant; import io.trino.spi.type.Type; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; import java.io.IOException; @@ -68,9 +67,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -114,10 +111,7 @@ import static io.trino.plugin.hive.util.AcidTables.isTransactionalTable; import static io.trino.plugin.hive.util.HiveUtil.makePartName; import static io.trino.plugin.hive.util.HiveUtil.toPartitionValues; -import static io.trino.plugin.hive.util.HiveWriteUtils.checkedDelete; -import static io.trino.plugin.hive.util.HiveWriteUtils.createDirectory; import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; -import static io.trino.plugin.hive.util.HiveWriteUtils.pathExists; import static io.trino.plugin.hive.util.Statistics.ReduceOperator.SUBTRACT; import static io.trino.plugin.hive.util.Statistics.merge; import static io.trino.plugin.hive.util.Statistics.reduce; @@ -144,16 +138,18 @@ public class SemiTransactionalHiveMetastore private static final int PARTITION_COMMIT_BATCH_SIZE = 20; private static final Pattern DELTA_DIRECTORY_MATCHER = Pattern.compile("(delete_)?delta_[\\d]+_[\\d]+_[\\d]+$"); - private static final RetryDriver DELETE_RETRY = RetryDriver.retry() - .maxAttempts(3) - .exponentialBackoff(new Duration(1, SECONDS), new Duration(1, SECONDS), new Duration(10, SECONDS), 2.0); + private static final RetryPolicy DELETE_RETRY_POLICY = RetryPolicy.builder() + .withDelay(java.time.Duration.ofSeconds(1)) + .withMaxDuration(java.time.Duration.ofSeconds(30)) + .withMaxAttempts(3) + .build(); private static final Map ACID_OPERATION_ACTION_TYPES = ImmutableMap.of( AcidOperation.INSERT, ActionType.INSERT_EXISTING, AcidOperation.MERGE, ActionType.MERGE); private final HiveMetastoreClosure delegate; - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystemFactory fileSystemFactory; private final Executor fileSystemExecutor; private final Executor dropExecutor; private final Executor updateExecutor; @@ -190,7 +186,7 @@ public class SemiTransactionalHiveMetastore private Optional currentHiveTransaction = Optional.empty(); public SemiTransactionalHiveMetastore( - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, HiveMetastoreClosure delegate, Executor fileSystemExecutor, Executor dropExecutor, @@ -202,7 +198,7 @@ public SemiTransactionalHiveMetastore( ScheduledExecutorService heartbeatService, TableInvalidationCallback tableInvalidationCallback) { - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.delegate = requireNonNull(delegate, "delegate is null"); this.fileSystemExecutor = requireNonNull(fileSystemExecutor, "fileSystemExecutor is null"); this.dropExecutor = requireNonNull(dropExecutor, "dropExecutor is null"); @@ -447,7 +443,7 @@ public synchronized void createDatabase(ConnectorSession session, Database datab "Database '%s' does not have correct query id set", database.getDatabaseName()); - setExclusive((delegate, hdfsEnvironment) -> { + setExclusive(delegate -> { try { delegate.createDatabase(database); } @@ -471,7 +467,7 @@ private static boolean isCreatedBy(Database database, String queryId) public synchronized void dropDatabase(ConnectorSession session, String schemaName) { - setExclusive((delegate, hdfsEnvironment) -> { + setExclusive(delegate -> { boolean deleteData = shouldDeleteDatabaseData(session, schemaName); delegate.dropDatabase(schemaName, deleteData); }); @@ -479,20 +475,21 @@ public synchronized void dropDatabase(ConnectorSession session, String schemaNam public boolean shouldDeleteDatabaseData(ConnectorSession session, String schemaName) { - Optional location = delegate.getDatabase(schemaName) + Optional location = delegate.getDatabase(schemaName) .orElseThrow(() -> new SchemaNotFoundException(schemaName)) .getLocation() - .map(Path::new); + .map(Location::of); // If we see files in the schema location, don't delete it. // If we see no files, request deletion. // If we fail to check the schema location, behave according to fallback. return location.map(path -> { try { - return !hdfsEnvironment.getFileSystem(new HdfsContext(session), path) - .listLocatedStatus(path).hasNext(); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + return !fileSystem.listFiles(path).hasNext() && + fileSystem.listDirectories(path).isEmpty(); } - catch (IOException | RuntimeException e) { + catch (IOException e) { log.warn(e, "Could not check schema directory '%s'", path); return deleteSchemaLocationsFallback; } @@ -501,19 +498,19 @@ public boolean shouldDeleteDatabaseData(ConnectorSession session, String schemaN public synchronized void renameDatabase(String source, String target) { - setExclusive((delegate, hdfsEnvironment) -> delegate.renameDatabase(source, target)); + setExclusive(delegate -> delegate.renameDatabase(source, target)); } public synchronized void setDatabaseOwner(String source, HivePrincipal principal) { - setExclusive((delegate, hdfsEnvironment) -> delegate.setDatabaseOwner(source, principal)); + setExclusive(delegate -> delegate.setDatabaseOwner(source, principal)); } // TODO: Allow updating statistics for 2 tables in the same transaction public synchronized void setTableStatistics(Table table, PartitionStatistics tableStatistics) { AcidTransaction transaction = currentHiveTransaction.isPresent() ? currentHiveTransaction.get().getTransaction() : NO_ACID_TRANSACTION; - setExclusive((delegate, hdfsEnvironment) -> + setExclusive(delegate -> delegate.updateTableStatistics(table.getDatabaseName(), table.getTableName(), transaction, statistics -> updatePartitionStatistics(statistics, tableStatistics))); } @@ -524,7 +521,7 @@ public synchronized void setPartitionStatistics(Table table, Map, P toImmutableMap( entry -> getPartitionName(table, entry.getKey()), entry -> oldPartitionStats -> updatePartitionStatistics(oldPartitionStats, entry.getValue()))); - setExclusive((delegate, hdfsEnvironment) -> + setExclusive(delegate -> delegate.updatePartitionStatistics( table.getDatabaseName(), table.getTableName(), @@ -566,7 +563,7 @@ public synchronized void createTable( ConnectorSession session, Table table, PrincipalPrivileges principalPrivileges, - Optional currentPath, + Optional currentLocation, Optional> files, boolean ignoreExisting, PartitionStatistics statistics, @@ -576,19 +573,17 @@ public synchronized void createTable( // When creating a table, it should never have partition actions. This is just a sanity check. checkNoPartitionAction(table.getDatabaseName(), table.getTableName()); Action oldTableAction = tableActions.get(table.getSchemaTableName()); - TableAndMore tableAndMore = new TableAndMore(table, Optional.of(principalPrivileges), currentPath, files, ignoreExisting, statistics, statistics, cleanExtraOutputFilesOnCommit); + TableAndMore tableAndMore = new TableAndMore(table, Optional.of(principalPrivileges), currentLocation, files, ignoreExisting, statistics, statistics, cleanExtraOutputFilesOnCommit); if (oldTableAction == null) { - HdfsContext hdfsContext = new HdfsContext(session); - tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, hdfsContext, session.getQueryId())); + tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, session.getIdentity(), session.getQueryId())); return; } switch (oldTableAction.getType()) { case DROP: - if (!oldTableAction.getHdfsContext().getIdentity().getUser().equals(session.getUser())) { + if (!oldTableAction.getIdentity().getUser().equals(session.getUser())) { throw new TrinoException(TRANSACTION_CONFLICT, "Operation on the same table with different user in the same transaction is not supported"); } - HdfsContext hdfsContext = new HdfsContext(session); - tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ALTER, tableAndMore, hdfsContext, session.getQueryId())); + tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ALTER, tableAndMore, session.getIdentity(), session.getQueryId())); return; case ADD: @@ -611,8 +606,7 @@ public synchronized void dropTable(ConnectorSession session, String databaseName SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName); Action oldTableAction = tableActions.get(schemaTableName); if (oldTableAction == null || oldTableAction.getType() == ActionType.ALTER) { - HdfsContext hdfsContext = new HdfsContext(session); - tableActions.put(schemaTableName, new Action<>(ActionType.DROP, null, hdfsContext, session.getQueryId())); + tableActions.put(schemaTableName, new Action<>(ActionType.DROP, null, session.getIdentity(), session.getQueryId())); return; } switch (oldTableAction.getType()) { @@ -632,12 +626,12 @@ public synchronized void dropTable(ConnectorSession session, String databaseName public synchronized void replaceTable(String databaseName, String tableName, Table table, PrincipalPrivileges principalPrivileges) { - setExclusive((delegate, hdfsEnvironment) -> delegate.replaceTable(databaseName, tableName, table, principalPrivileges)); + setExclusive(delegate -> delegate.replaceTable(databaseName, tableName, table, principalPrivileges)); } public synchronized void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) { - setExclusive((delegate, hdfsEnvironment) -> { + setExclusive(delegate -> { Optional oldTable = delegate.getTable(databaseName, tableName); try { delegate.renameTable(databaseName, tableName, newDatabaseName, newTableName); @@ -651,32 +645,32 @@ public synchronized void renameTable(String databaseName, String tableName, Stri public synchronized void commentTable(String databaseName, String tableName, Optional comment) { - setExclusive((delegate, hdfsEnvironment) -> delegate.commentTable(databaseName, tableName, comment)); + setExclusive(delegate -> delegate.commentTable(databaseName, tableName, comment)); } public synchronized void setTableOwner(String schema, String table, HivePrincipal principal) { - setExclusive((delegate, hdfsEnvironment) -> delegate.setTableOwner(schema, table, principal)); + setExclusive(delegate -> delegate.setTableOwner(schema, table, principal)); } public synchronized void commentColumn(String databaseName, String tableName, String columnName, Optional comment) { - setExclusive((delegate, hdfsEnvironment) -> delegate.commentColumn(databaseName, tableName, columnName, comment)); + setExclusive(delegate -> delegate.commentColumn(databaseName, tableName, columnName, comment)); } public synchronized void addColumn(String databaseName, String tableName, String columnName, HiveType columnType, String columnComment) { - setExclusive((delegate, hdfsEnvironment) -> delegate.addColumn(databaseName, tableName, columnName, columnType, columnComment)); + setExclusive(delegate -> delegate.addColumn(databaseName, tableName, columnName, columnType, columnComment)); } public synchronized void renameColumn(String databaseName, String tableName, String oldColumnName, String newColumnName) { - setExclusive((delegate, hdfsEnvironment) -> delegate.renameColumn(databaseName, tableName, oldColumnName, newColumnName)); + setExclusive(delegate -> delegate.renameColumn(databaseName, tableName, oldColumnName, newColumnName)); } public synchronized void dropColumn(String databaseName, String tableName, String columnName) { - setExclusive((delegate, hdfsEnvironment) -> delegate.dropColumn(databaseName, tableName, columnName)); + setExclusive(delegate -> delegate.dropColumn(databaseName, tableName, columnName)); } public synchronized void finishChangingExistingTable( @@ -701,7 +695,6 @@ public synchronized void finishChangingExistingTable( table = Table.builder(table).setWriteId(OptionalLong.of(currentHiveTransaction.orElseThrow().getTransaction().getWriteId())).build(); } PartitionStatistics currentStatistics = getTableStatistics(databaseName, tableName, Optional.empty()); - HdfsContext hdfsContext = new HdfsContext(session); tableActions.put( schemaTableName, new Action<>( @@ -709,13 +702,13 @@ public synchronized void finishChangingExistingTable( new TableAndMore( table, Optional.empty(), - Optional.of(new Path(currentLocation.toString())), + Optional.of(currentLocation), Optional.of(fileNames), false, merge(currentStatistics, statisticsUpdate), statisticsUpdate, cleanExtraOutputFilesOnCommit), - hdfsContext, + session.getIdentity(), session.getQueryId())); return; } @@ -753,10 +746,10 @@ public synchronized void truncateUnpartitionedTable(ConnectorSession session, St throw new IllegalArgumentException("Table is partitioned"); } - Path path = new Path(table.getStorage().getLocation()); - HdfsContext context = new HdfsContext(session); - setExclusive((delegate, hdfsEnvironment) -> { - RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(hdfsEnvironment, context, path, ImmutableSet.of(""), false); + Location location = Location.of(table.getStorage().getLocation()); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity()); + setExclusive(delegate -> { + RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(fileSystem, location, ImmutableSet.of(""), false); if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) { throw new TrinoException(HIVE_FILESYSTEM_ERROR, format( "Error deleting from unpartitioned table %s. These items cannot be deleted: %s", @@ -786,7 +779,6 @@ public synchronized void finishMerge( Action oldTableAction = tableActions.get(schemaTableName); if (oldTableAction == null) { Table table = getExistingTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); - HdfsContext hdfsContext = new HdfsContext(session); PrincipalPrivileges principalPrivileges = table.getOwner().isEmpty() ? NO_PRIVILEGES : buildInitialPrivilegeSet(table.getOwner().get()); tableActions.put( @@ -796,10 +788,10 @@ public synchronized void finishMerge( new TableAndMergeResults( table, Optional.of(principalPrivileges), - Optional.of(new Path(currentLocation.toString())), + Optional.of(currentLocation), partitionUpdateAndMergeResults, partitions), - hdfsContext, + session.getIdentity(), session.getQueryId())); return; } @@ -981,22 +973,21 @@ public synchronized void addPartition( checkArgument(getQueryId(partition).isPresent()); Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); Action oldPartitionAction = partitionActionsOfTable.get(partition.getValues()); - HdfsContext hdfsContext = new HdfsContext(session); if (oldPartitionAction == null) { partitionActionsOfTable.put( partition.getValues(), - new Action<>(ActionType.ADD, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, cleanExtraOutputFilesOnCommit), hdfsContext, session.getQueryId())); + new Action<>(ActionType.ADD, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, cleanExtraOutputFilesOnCommit), session.getIdentity(), session.getQueryId())); return; } switch (oldPartitionAction.getType()) { case DROP: case DROP_PRESERVE_DATA: - if (!oldPartitionAction.getHdfsContext().getIdentity().getUser().equals(session.getUser())) { + if (!oldPartitionAction.getIdentity().getUser().equals(session.getUser())) { throw new TrinoException(TRANSACTION_CONFLICT, "Operation on the same partition with different user in the same transaction is not supported"); } partitionActionsOfTable.put( partition.getValues(), - new Action<>(ActionType.ALTER, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, cleanExtraOutputFilesOnCommit), hdfsContext, session.getQueryId())); + new Action<>(ActionType.ALTER, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, cleanExtraOutputFilesOnCommit), session.getIdentity(), session.getQueryId())); return; case ADD: case ALTER: @@ -1013,12 +1004,11 @@ public synchronized void dropPartition(ConnectorSession session, String database Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>()); Action oldPartitionAction = partitionActionsOfTable.get(partitionValues); if (oldPartitionAction == null) { - HdfsContext hdfsContext = new HdfsContext(session); if (deleteData) { - partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP, null, hdfsContext, session.getQueryId())); + partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP, null, session.getIdentity(), session.getQueryId())); } else { - partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP_PRESERVE_DATA, null, hdfsContext, session.getQueryId())); + partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP_PRESERVE_DATA, null, session.getIdentity(), session.getQueryId())); } return; } @@ -1046,17 +1036,14 @@ public synchronized void finishInsertIntoExistingPartitions( { setShared(); SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName); - HdfsContext context = new HdfsContext(session); Map, Action> partitionActionsOfTable = partitionActions.computeIfAbsent(schemaTableName, k -> new HashMap<>()); for (PartitionUpdateInfo partitionInfo : partitionUpdateInfos) { Action oldPartitionAction = partitionActionsOfTable.get(partitionInfo.partitionValues); if (oldPartitionAction != null) { switch (oldPartitionAction.getType()) { - case DROP, DROP_PRESERVE_DATA -> - throw new PartitionNotFoundException(schemaTableName, partitionInfo.partitionValues); - case ADD, ALTER, INSERT_EXISTING, MERGE -> - throw new UnsupportedOperationException("Inserting into a partition that were added, altered, or inserted into in the same transaction is not supported"); + case DROP, DROP_PRESERVE_DATA -> throw new PartitionNotFoundException(schemaTableName, partitionInfo.partitionValues); + case ADD, ALTER, INSERT_EXISTING, MERGE -> throw new UnsupportedOperationException("Inserting into a partition that were added, altered, or inserted into in the same transaction is not supported"); default -> throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType()); } } @@ -1102,7 +1089,7 @@ public synchronized void finishInsertIntoExistingPartitions( merge(currentStatistics, partitionInfo.statisticsUpdate), partitionInfo.statisticsUpdate, cleanExtraOutputFilesOnCommit), - context, + session.getIdentity(), session.getQueryId())); } } @@ -1132,13 +1119,13 @@ private static String getPartitionName(Table table, List partitionValues @Override public synchronized void createRole(String role, String grantor) { - setExclusive((delegate, hdfsEnvironment) -> delegate.createRole(role, grantor)); + setExclusive(delegate -> delegate.createRole(role, grantor)); } @Override public synchronized void dropRole(String role) { - setExclusive((delegate, hdfsEnvironment) -> delegate.dropRole(role)); + setExclusive(delegate -> delegate.dropRole(role)); } @Override @@ -1151,13 +1138,13 @@ public synchronized Set listRoles() @Override public synchronized void grantRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) { - setExclusive((delegate, hdfsEnvironment) -> delegate.grantRoles(roles, grantees, adminOption, grantor)); + setExclusive(delegate -> delegate.grantRoles(roles, grantees, adminOption, grantor)); } @Override public synchronized void revokeRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) { - setExclusive((delegate, hdfsEnvironment) -> delegate.revokeRoles(roles, grantees, adminOption, grantor)); + setExclusive(delegate -> delegate.revokeRoles(roles, grantees, adminOption, grantor)); } @Override @@ -1238,13 +1225,13 @@ private Table getExistingTable(String databaseName, String tableName) @Override public synchronized void grantTablePrivileges(String databaseName, String tableName, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) { - setExclusive((delegate, hdfsEnvironment) -> delegate.grantTablePrivileges(databaseName, tableName, getRequiredTableOwner(databaseName, tableName), grantee, grantor, privileges, grantOption)); + setExclusive(delegate -> delegate.grantTablePrivileges(databaseName, tableName, getRequiredTableOwner(databaseName, tableName), grantee, grantor, privileges, grantOption)); } @Override public synchronized void revokeTablePrivileges(String databaseName, String tableName, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) { - setExclusive((delegate, hdfsEnvironment) -> delegate.revokeTablePrivileges(databaseName, tableName, getRequiredTableOwner(databaseName, tableName), grantee, grantor, privileges, grantOption)); + setExclusive(delegate -> delegate.revokeTablePrivileges(databaseName, tableName, getRequiredTableOwner(databaseName, tableName), grantee, grantor, privileges, grantOption)); } public synchronized String declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Location stagingPathRoot, SchemaTableName schemaTableName) @@ -1256,11 +1243,11 @@ public synchronized String declareIntentionToWrite(ConnectorSession session, Wri throw new TrinoException(NOT_SUPPORTED, "Cannot insert into a table with a partition that has been modified in the same transaction when Trino is configured to skip temporary directories."); } } - HdfsContext hdfsContext = new HdfsContext(session); + ConnectorIdentity identity = session.getIdentity(); String queryId = session.getQueryId(); String declarationId = queryId + "_" + declaredIntentionsToWriteCounter; declaredIntentionsToWriteCounter++; - declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(declarationId, writeMode, hdfsContext, queryId, new Path(stagingPathRoot.toString()), schemaTableName)); + declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(declarationId, writeMode, identity, queryId, stagingPathRoot, schemaTableName)); return declarationId; } @@ -1288,7 +1275,7 @@ public synchronized void commit() return; case EXCLUSIVE_OPERATION_BUFFERED: requireNonNull(bufferedExclusiveOperation, "bufferedExclusiveOperation is null"); - bufferedExclusiveOperation.execute(delegate, hdfsEnvironment); + bufferedExclusiveOperation.execute(delegate); return; case FINISHED: throw new IllegalStateException("Tried to commit buffered metastore operations after transaction has been committed/aborted"); @@ -1532,16 +1519,16 @@ private void commitShared() committer.prepareDropTable(schemaTableName); break; case ALTER: - committer.prepareAlterTable(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareAlterTable(action.getIdentity(), action.getQueryId(), action.getData()); break; case ADD: - committer.prepareAddTable(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareAddTable(action.getIdentity(), action.getQueryId(), action.getData()); break; case INSERT_EXISTING: - committer.prepareInsertExistingTable(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareInsertExistingTable(action.getIdentity(), action.getQueryId(), action.getData()); break; case MERGE: - committer.prepareMergeExistingTable(action.getHdfsContext(), action.getData()); + committer.prepareMergeExistingTable(action.getIdentity(), action.getData()); break; default: throw new IllegalStateException("Unknown action type: " + action.getType()); @@ -1560,16 +1547,16 @@ private void commitShared() committer.prepareDropPartition(schemaTableName, partitionValues, false); break; case ALTER: - committer.prepareAlterPartition(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareAlterPartition(action.getIdentity(), action.getQueryId(), action.getData()); break; case ADD: - committer.prepareAddPartition(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareAddPartition(action.getIdentity(), action.getQueryId(), action.getData()); break; case INSERT_EXISTING: - committer.prepareInsertExistingPartition(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareInsertExistingPartition(action.getIdentity(), action.getQueryId(), action.getData()); break; case MERGE: - committer.prepareInsertExistingPartition(action.getHdfsContext(), action.getQueryId(), action.getData()); + committer.prepareInsertExistingPartition(action.getIdentity(), action.getQueryId(), action.getData()); break; default: throw new IllegalStateException("Unknown action type: " + action.getType()); @@ -1698,19 +1685,18 @@ private void prepareDropTable(SchemaTableName schemaTableName) })); } - private void prepareAlterTable(HdfsContext hdfsContext, String queryId, TableAndMore tableAndMore) + private void prepareAlterTable(ConnectorIdentity identity, String queryId, TableAndMore tableAndMore) { deleteOnly = false; Table table = tableAndMore.getTable(); - String targetLocation = table.getStorage().getLocation(); + Location targetLocation = Location.of(table.getStorage().getLocation()); Table oldTable = delegate.getTable(table.getDatabaseName(), table.getTableName()) .orElseThrow(() -> new TrinoException(TRANSACTION_CONFLICT, "The table that this transaction modified was deleted in another transaction. " + table.getSchemaTableName())); - String oldTableLocation = oldTable.getStorage().getLocation(); - Path oldTablePath = new Path(oldTableLocation); + Location oldTableLocation = Location.of(oldTable.getStorage().getLocation()); tablesToInvalidate.add(oldTable); - cleanExtraOutputFiles(hdfsContext, queryId, tableAndMore); + cleanExtraOutputFiles(identity, queryId, tableAndMore); // Location of the old table and the new table can be different because we allow arbitrary directories through LocationService. // If the location of the old table is the same as the location of the new table: @@ -1720,33 +1706,31 @@ private void prepareAlterTable(HdfsContext hdfsContext, String queryId, TableAnd // Otherwise, // * Remember we will need to delete the location of the old partition at the end if transaction successfully commits if (targetLocation.equals(oldTableLocation)) { - Path oldTableStagingPath = new Path(oldTablePath.getParent(), "_temp_" + oldTablePath.getName() + "_" + queryId); + Location location = asFileLocation(oldTableLocation); + Location oldTableStagingPath = location.parentDirectory().appendPath("_temp_" + location.fileName() + "_" + queryId); renameDirectory( - hdfsContext, - hdfsEnvironment, - oldTablePath, + fileSystemFactory.create(identity), + oldTableLocation, oldTableStagingPath, - () -> renameTasksForAbort.add(new DirectoryRenameTask(hdfsContext, oldTableStagingPath, oldTablePath))); + () -> renameTasksForAbort.add(new DirectoryRenameTask(identity, oldTableStagingPath, oldTableLocation))); if (!skipDeletionForAlter) { - deletionTasksForFinish.add(new DirectoryDeletionTask(hdfsContext, oldTableStagingPath)); + deletionTasksForFinish.add(new DirectoryDeletionTask(identity, oldTableStagingPath)); } } else { if (!skipDeletionForAlter) { - deletionTasksForFinish.add(new DirectoryDeletionTask(hdfsContext, oldTablePath)); + deletionTasksForFinish.add(new DirectoryDeletionTask(identity, oldTableLocation)); } } - Path currentPath = tableAndMore.getCurrentLocation() + Location currentLocation = tableAndMore.getCurrentLocation() .orElseThrow(() -> new IllegalArgumentException("location should be present for alter table")); - Path targetPath = new Path(targetLocation); - if (!targetPath.equals(currentPath)) { + if (!targetLocation.equals(currentLocation)) { renameDirectory( - hdfsContext, - hdfsEnvironment, - currentPath, - targetPath, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true))); + fileSystemFactory.create(identity), + currentLocation, + targetLocation, + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetLocation, true))); } // Partition alter must happen regardless of whether original and current location is the same // because metadata might change: e.g. storage format, column types, etc @@ -1759,39 +1743,38 @@ private void prepareAlterTable(HdfsContext hdfsContext, String queryId, TableAnd false)); } - private void prepareAddTable(HdfsContext context, String queryId, TableAndMore tableAndMore) + private void prepareAddTable(ConnectorIdentity identity, String queryId, TableAndMore tableAndMore) { deleteOnly = false; - cleanExtraOutputFiles(context, queryId, tableAndMore); + cleanExtraOutputFiles(identity, queryId, tableAndMore); Table table = tableAndMore.getTable(); if (table.getTableType().equals(MANAGED_TABLE.name())) { - Optional targetLocation = table.getStorage().getOptionalLocation(); + Optional targetLocation = table.getStorage().getOptionalLocation().map(Location::of); if (targetLocation.isPresent()) { - checkArgument(!targetLocation.get().isEmpty(), "target location is empty"); - Optional currentPath = tableAndMore.getCurrentLocation(); - Path targetPath = new Path(targetLocation.get()); - if (table.getPartitionColumns().isEmpty() && currentPath.isPresent()) { + Optional currentLocation = tableAndMore.getCurrentLocation(); + Location targetPath = targetLocation.get(); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + if (table.getPartitionColumns().isEmpty() && currentLocation.isPresent()) { // CREATE TABLE AS SELECT unpartitioned table - if (targetPath.equals(currentPath.get())) { + if (targetPath.equals(currentLocation.get())) { // Target path and current path are the same. Therefore, directory move is not needed. } else { renameDirectory( - context, - hdfsEnvironment, - currentPath.get(), + fileSystem, + currentLocation.get(), targetPath, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true))); + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, true))); } } else { // CREATE TABLE AS SELECT partitioned table, or // CREATE TABLE partitioned/unpartitioned table (without data) - if (pathExists(context, hdfsEnvironment, targetPath)) { - if (currentPath.isPresent() && currentPath.get().equals(targetPath)) { - // It is okay to skip directory creation when currentPath is equal to targetPath + if (directoryExists(fileSystem, targetPath)) { + if (currentLocation.isPresent() && currentLocation.get().equals(targetPath)) { + // It is okay to skip directory creation when currentLocation is equal to targetPath // because the directory may have been created when creating partition directories. // However, it is important to note that the two being equal does not guarantee // a directory had been created. @@ -1803,8 +1786,8 @@ private void prepareAddTable(HdfsContext context, String queryId, TableAndMore t } } else { - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)); - createDirectory(context, hdfsEnvironment, targetPath); + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, true)); + createDirectory(fileSystem, targetPath); } } } @@ -1813,22 +1796,23 @@ private void prepareAddTable(HdfsContext context, String queryId, TableAndMore t addTableOperations.add(new CreateTableOperation(table, tableAndMore.getPrincipalPrivileges(), tableAndMore.isIgnoreExisting(), tableAndMore.getStatisticsUpdate())); } - private void prepareInsertExistingTable(HdfsContext context, String queryId, TableAndMore tableAndMore) + private void prepareInsertExistingTable(ConnectorIdentity identity, String queryId, TableAndMore tableAndMore) { deleteOnly = false; Table table = tableAndMore.getTable(); - Path targetPath = new Path(table.getStorage().getLocation()); + Location targetPath = Location.of(table.getStorage().getLocation()); tablesToInvalidate.add(table); - Path currentPath = tableAndMore.getCurrentLocation().orElseThrow(); - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, false)); + Location currentPath = tableAndMore.getCurrentLocation().orElseThrow(); + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, false)); if (!targetPath.equals(currentPath)) { // if staging directory is used we cherry-pick files to be moved - asyncRename(hdfsEnvironment, fileSystemExecutor, fileSystemOperationsCancelled, fileSystemOperationFutures, context, currentPath, targetPath, tableAndMore.getFileNames().orElseThrow()); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + asyncRename(fileSystem, fileSystemExecutor, fileSystemOperationsCancelled, fileSystemOperationFutures, currentPath, targetPath, tableAndMore.getFileNames().orElseThrow()); } else { // if we inserted directly into table directory we need to remove extra output files which should not be part of the table - cleanExtraOutputFiles(context, queryId, tableAndMore); + cleanExtraOutputFiles(identity, queryId, tableAndMore); } updateStatisticsOperations.add(new UpdateStatisticsOperation( table.getSchemaTableName(), @@ -1842,7 +1826,7 @@ private void prepareInsertExistingTable(HdfsContext context, String queryId, Tab } } - private void prepareMergeExistingTable(HdfsContext context, TableAndMore tableAndMore) + private void prepareMergeExistingTable(ConnectorIdentity identity, TableAndMore tableAndMore) { checkArgument(currentHiveTransaction.isPresent(), "currentHiveTransaction isn't present"); AcidTransaction transaction = currentHiveTransaction.get().getTransaction(); @@ -1850,11 +1834,12 @@ private void prepareMergeExistingTable(HdfsContext context, TableAndMore tableAn deleteOnly = false; Table table = tableAndMore.getTable(); - Path targetPath = new Path(table.getStorage().getLocation()); - Path currentPath = tableAndMore.getCurrentLocation().get(); - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, false)); + Location targetPath = Location.of(table.getStorage().getLocation()); + Location currentPath = tableAndMore.getCurrentLocation().orElseThrow(); + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, false)); if (!targetPath.equals(currentPath)) { - asyncRename(hdfsEnvironment, fileSystemExecutor, fileSystemOperationsCancelled, fileSystemOperationFutures, context, currentPath, targetPath, tableAndMore.getFileNames().get()); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + asyncRename(fileSystem, fileSystemExecutor, fileSystemOperationsCancelled, fileSystemOperationFutures, currentPath, targetPath, tableAndMore.getFileNames().get()); } updateStatisticsOperations.add(new UpdateStatisticsOperation( table.getSchemaTableName(), @@ -1881,7 +1866,7 @@ private void prepareDropPartition(SchemaTableName schemaTableName, List })); } - private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, PartitionAndMore partitionAndMore) + private void prepareAlterPartition(ConnectorIdentity identity, String queryId, PartitionAndMore partitionAndMore) { deleteOnly = false; @@ -1895,9 +1880,9 @@ private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, Part String partitionName = getPartitionName(partition.getDatabaseName(), partition.getTableName(), partition.getValues()); PartitionStatistics oldPartitionStatistics = getExistingPartitionStatistics(partition, partitionName); String oldPartitionLocation = oldPartition.getStorage().getLocation(); - Path oldPartitionPath = new Path(oldPartitionLocation); + Location oldPartitionPath = asFileLocation(Location.of(oldPartitionLocation)); - cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore); + cleanExtraOutputFiles(identity, queryId, partitionAndMore); // Location of the old partition and the new partition can be different because we allow arbitrary directories through LocationService. // If the location of the old partition is the same as the location of the new partition: @@ -1907,32 +1892,30 @@ private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, Part // Otherwise, // * Remember we will need to delete the location of the old partition at the end if transaction successfully commits if (targetLocation.equals(oldPartitionLocation)) { - Path oldPartitionStagingPath = new Path(oldPartitionPath.getParent(), "_temp_" + oldPartitionPath.getName() + "_" + queryId); + Location oldPartitionStagingPath = oldPartitionPath.sibling("_temp_" + oldPartitionPath.fileName() + "_" + queryId); renameDirectory( - hdfsContext, - hdfsEnvironment, + fileSystemFactory.create(identity), oldPartitionPath, oldPartitionStagingPath, - () -> renameTasksForAbort.add(new DirectoryRenameTask(hdfsContext, oldPartitionStagingPath, oldPartitionPath))); + () -> renameTasksForAbort.add(new DirectoryRenameTask(identity, oldPartitionStagingPath, oldPartitionPath))); if (!skipDeletionForAlter) { - deletionTasksForFinish.add(new DirectoryDeletionTask(hdfsContext, oldPartitionStagingPath)); + deletionTasksForFinish.add(new DirectoryDeletionTask(identity, oldPartitionStagingPath)); } } else { if (!skipDeletionForAlter) { - deletionTasksForFinish.add(new DirectoryDeletionTask(hdfsContext, oldPartitionPath)); + deletionTasksForFinish.add(new DirectoryDeletionTask(identity, oldPartitionPath)); } } - Path currentPath = new Path(partitionAndMore.getCurrentLocation().toString()); - Path targetPath = new Path(targetLocation); + Location currentPath = partitionAndMore.getCurrentLocation(); + Location targetPath = Location.of(targetLocation); if (!targetPath.equals(currentPath)) { renameDirectory( - hdfsContext, - hdfsEnvironment, + fileSystemFactory.create(identity), currentPath, targetPath, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true))); + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, true))); } // Partition alter must happen regardless of whether original and current location is the same // because metadata might change: e.g. storage format, column types, etc @@ -1941,24 +1924,27 @@ private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, Part new PartitionWithStatistics(oldPartition, partitionName, oldPartitionStatistics))); } - private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, PartitionAndMore partitionAndMore) + private void cleanExtraOutputFiles(ConnectorIdentity identity, String queryId, PartitionAndMore partitionAndMore) { if (!partitionAndMore.isCleanExtraOutputFilesOnCommit()) { return; } verify(partitionAndMore.hasFileNames(), "fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true"); - SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames())); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + SemiTransactionalHiveMetastore.cleanExtraOutputFiles(fileSystem, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames())); } - private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, TableAndMore tableAndMore) + private void cleanExtraOutputFiles(ConnectorIdentity identity, String queryId, TableAndMore tableAndMore) { if (!tableAndMore.isCleanExtraOutputFilesOnCommit()) { return; } - Path tableLocation = tableAndMore.getCurrentLocation().orElseThrow(() -> new IllegalArgumentException("currentLocation expected to be set if isCleanExtraOutputFilesOnCommit is true")); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + Location tableLocation = tableAndMore.getCurrentLocation().orElseThrow(() -> + new IllegalArgumentException("currentLocation expected to be set if isCleanExtraOutputFilesOnCommit is true")); List files = tableAndMore.getFileNames().orElseThrow(() -> new IllegalArgumentException("fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true")); - SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, Location.of(tableLocation.toString()), ImmutableSet.copyOf(files)); + SemiTransactionalHiveMetastore.cleanExtraOutputFiles(fileSystem, queryId, tableLocation, ImmutableSet.copyOf(files)); } private PartitionStatistics getExistingPartitionStatistics(Partition partition, String partitionName) @@ -1987,16 +1973,16 @@ private PartitionStatistics getExistingPartitionStatistics(Partition partition, } } - private void prepareAddPartition(HdfsContext hdfsContext, String queryId, PartitionAndMore partitionAndMore) + private void prepareAddPartition(ConnectorIdentity identity, String queryId, PartitionAndMore partitionAndMore) { deleteOnly = false; Partition partition = partitionAndMore.getPartition(); String targetLocation = partition.getStorage().getLocation(); - Path currentPath = new Path(partitionAndMore.getCurrentLocation().toString()); - Path targetPath = new Path(targetLocation); + Location currentPath = partitionAndMore.getCurrentLocation(); + Location targetPath = Location.of(targetLocation); - cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore); + cleanExtraOutputFiles(identity, queryId, partitionAndMore); PartitionAdder partitionAdder = partitionAdders.computeIfAbsent( partition.getSchemaTableName(), @@ -2006,19 +1992,19 @@ private void prepareAddPartition(HdfsContext hdfsContext, String queryId, Partit if (fileSystemOperationsCancelled.get()) { return; } - if (pathExists(hdfsContext, hdfsEnvironment, currentPath)) { + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + if (directoryExists(fileSystem, currentPath)) { if (!targetPath.equals(currentPath)) { renameDirectory( - hdfsContext, - hdfsEnvironment, + fileSystem, currentPath, targetPath, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true))); + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, true))); } } else { - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true)); - createDirectory(hdfsContext, hdfsEnvironment, targetPath); + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, true)); + createDirectory(fileSystem, targetPath); } }, fileSystemExecutor)); @@ -2026,23 +2012,24 @@ private void prepareAddPartition(HdfsContext hdfsContext, String queryId, Partit partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate())); } - private void prepareInsertExistingPartition(HdfsContext hdfsContext, String queryId, PartitionAndMore partitionAndMore) + private void prepareInsertExistingPartition(ConnectorIdentity identity, String queryId, PartitionAndMore partitionAndMore) { deleteOnly = false; Partition partition = partitionAndMore.getPartition(); partitionsToInvalidate.add(partition); - Path targetPath = new Path(partition.getStorage().getLocation()); - Path currentPath = new Path(partitionAndMore.getCurrentLocation().toString()); - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, false)); + Location targetPath = Location.of(partition.getStorage().getLocation()); + Location currentPath = partitionAndMore.getCurrentLocation(); + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(identity, targetPath, false)); if (!targetPath.equals(currentPath)) { // if staging directory is used we cherry-pick files to be moved - asyncRename(hdfsEnvironment, fileSystemExecutor, fileSystemOperationsCancelled, fileSystemOperationFutures, hdfsContext, currentPath, targetPath, partitionAndMore.getFileNames()); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + asyncRename(fileSystem, fileSystemExecutor, fileSystemOperationsCancelled, fileSystemOperationFutures, currentPath, targetPath, partitionAndMore.getFileNames()); } else { // if we inserted directly into partition directory we need to remove extra output files which should not be part of the table - cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore); + cleanExtraOutputFiles(identity, queryId, partitionAndMore); } updateStatisticsOperations.add(new UpdateStatisticsOperation( @@ -2058,15 +2045,19 @@ private void executeCleanupTasksForAbort(Collection de .map(DeclaredIntentionToWrite::getQueryId) .collect(toImmutableSet()); for (DirectoryCleanUpTask cleanUpTask : cleanUpTasksForAbort) { - recursiveDeleteFilesAndLog(cleanUpTask.getContext(), cleanUpTask.getPath(), queryIds, cleanUpTask.isDeleteEmptyDirectory(), "temporary directory commit abort"); + recursiveDeleteFilesAndLog(cleanUpTask.identity(), cleanUpTask.location(), queryIds, cleanUpTask.deleteEmptyDirectory(), "temporary directory commit abort"); } } private void executeDeletionTasksForFinish() { for (DirectoryDeletionTask deletionTask : deletionTasksForFinish) { - if (!deleteRecursivelyIfExists(deletionTask.getContext(), hdfsEnvironment, deletionTask.getPath())) { - logCleanupFailure("Error deleting directory %s", deletionTask.getPath()); + TrinoFileSystem fileSystem = fileSystemFactory.create(deletionTask.identity()); + try { + fileSystem.deleteDirectory(deletionTask.location()); + } + catch (IOException e) { + logCleanupFailure(e, "Error deleting directory: %s", deletionTask.location()); } } } @@ -2077,12 +2068,13 @@ private void executeRenameTasksForAbort() try { // Ignore the task if the source directory doesn't exist. // This is probably because the original rename that we are trying to undo here never succeeded. - if (pathExists(directoryRenameTask.getContext(), hdfsEnvironment, directoryRenameTask.getRenameFrom())) { - renameDirectory(directoryRenameTask.getContext(), hdfsEnvironment, directoryRenameTask.getRenameFrom(), directoryRenameTask.getRenameTo(), () -> {}); + TrinoFileSystem fileSystem = fileSystemFactory.create(directoryRenameTask.identity()); + if (directoryExists(fileSystem, directoryRenameTask.renameFrom())) { + renameDirectory(fileSystem, directoryRenameTask.renameFrom(), directoryRenameTask.renameTo(), () -> {}); } } catch (Throwable throwable) { - logCleanupFailure(throwable, "failed to undo rename of partition directory: %s to %s", directoryRenameTask.getRenameFrom(), directoryRenameTask.getRenameTo()); + logCleanupFailure(throwable, "failed to undo rename of partition directory: %s to %s", directoryRenameTask.renameFrom(), directoryRenameTask.renameTo()); } } } @@ -2098,8 +2090,8 @@ private void pruneAndDeleteStagingDirectories(List dec .map(DeclaredIntentionToWrite::getQueryId) .collect(toImmutableSet()); - Path path = declaredIntentionToWrite.getRootPath(); - recursiveDeleteFilesAndLog(declaredIntentionToWrite.getHdfsContext(), path, queryIds, true, "staging directory cleanup"); + Location path = declaredIntentionToWrite.getRootPath(); + recursiveDeleteFilesAndLog(declaredIntentionToWrite.getIdentity(), path, queryIds, true, "staging directory cleanup"); } } @@ -2319,7 +2311,7 @@ private void rollbackShared() break; } - Path rootPath = declaredIntentionToWrite.getRootPath(); + Location rootPath = declaredIntentionToWrite.getRootPath(); // In the case of DIRECT_TO_TARGET_NEW_DIRECTORY, if the directory is not guaranteed to be unique // for the query, it is possible that another query or compute engine may see the directory, wrote @@ -2327,19 +2319,19 @@ private void rollbackShared() // directories must be carried out conservatively. To be safe, we only delete files that start or // end with the query IDs in this transaction. recursiveDeleteFilesAndLog( - declaredIntentionToWrite.getHdfsContext(), + declaredIntentionToWrite.getIdentity(), rootPath, ImmutableSet.of(declaredIntentionToWrite.getQueryId()), true, format("staging/target_new directory rollback for table %s", declaredIntentionToWrite.getSchemaTableName())); break; case DIRECT_TO_TARGET_EXISTING_DIRECTORY: - Set pathsToClean = new HashSet<>(); + Set pathsToClean = new HashSet<>(); // Check the base directory of the declared intention // * existing partition may also be in this directory // * this is where new partitions are created - Path baseDirectory = declaredIntentionToWrite.getRootPath(); + Location baseDirectory = declaredIntentionToWrite.getRootPath(); pathsToClean.add(baseDirectory); SchemaTableName schemaTableName = declaredIntentionToWrite.getSchemaTableName(); @@ -2360,11 +2352,10 @@ private void rollbackShared() for (List partitionNameBatch : Iterables.partition(partitionNames, 10)) { Collection> partitions = delegate.getPartitionsByNames(schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionNameBatch).values(); partitions.stream() - .filter(Optional::isPresent) - .map(Optional::get) + .flatMap(Optional::stream) .map(partition -> partition.getStorage().getLocation()) - .map(Path::new) - .filter(path -> !isSameOrParent(baseDirectory, path)) + .filter(path -> !path.startsWith(baseDirectory.toString())) + .map(Location::of) .forEach(pathsToClean::add); } } @@ -2377,11 +2368,10 @@ private void rollbackShared() } // delete any file that starts or ends with the query ID - for (Path path : pathsToClean) { - // TODO: It is a known deficiency that some empty directory does not get cleaned up in S3. + for (Location path : pathsToClean) { // We cannot delete any of the directories here since we do not know who created them. recursiveDeleteFilesAndLog( - declaredIntentionToWrite.getHdfsContext(), + declaredIntentionToWrite.getIdentity(), path, ImmutableSet.of(declaredIntentionToWrite.getQueryId()), false, @@ -2457,19 +2447,6 @@ private void checkNoPartitionAction(String databaseName, String tableName) } } - private static boolean isSameOrParent(Path parent, Path child) - { - int parentDepth = parent.depth(); - int childDepth = child.depth(); - if (parentDepth > childDepth) { - return false; - } - for (int i = childDepth; i > parentDepth; i--) { - child = child.getParent(); - } - return parent.equals(child); - } - @FormatMethod private void logCleanupFailure(String format, Object... args) { @@ -2498,37 +2475,23 @@ private static void addSuppressedExceptions(List suppressedExceptions } private static void asyncRename( - HdfsEnvironment hdfsEnvironment, + TrinoFileSystem fileSystem, Executor executor, AtomicBoolean cancelled, List> fileRenameFutures, - HdfsContext context, - Path currentPath, - Path targetPath, + Location currentPath, + Location targetPath, List fileNames) { - FileSystem fileSystem; - try { - fileSystem = hdfsEnvironment.getFileSystem(context, currentPath); - } - catch (IOException e) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error moving data files to final location. Error listing directory %s", currentPath), e); - } - for (String fileName : fileNames) { - Path source = new Path(currentPath, fileName); - Path target = new Path(targetPath, fileName); + Location source = currentPath.appendPath(fileName); + Location target = targetPath.appendPath(fileName); fileRenameFutures.add(CompletableFuture.runAsync(() -> { if (cancelled.get()) { return; } try { - if (fileSystem.exists(target)) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error moving data files from %s to final location %s: target location already exists", source, target)); - } - if (!fileSystem.rename(source, target)) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error moving data files from %s to final location %s: rename not successful", source, target)); - } + fileSystem.renameFile(source, target); } catch (IOException e) { throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error moving data files from %s to final location %s", source, target), e); @@ -2537,11 +2500,10 @@ private static void asyncRename( } } - private void recursiveDeleteFilesAndLog(HdfsContext context, Path directory, Set queryIds, boolean deleteEmptyDirectories, String reason) + private void recursiveDeleteFilesAndLog(ConnectorIdentity identity, Location directory, Set queryIds, boolean deleteEmptyDirectories, String reason) { RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles( - hdfsEnvironment, - context, + fileSystemFactory.create(identity), directory, queryIds, deleteEmptyDirectories); @@ -2575,13 +2537,10 @@ else if (deleteEmptyDirectories && !recursiveDeleteResult.isDirectoryNoLongerExi * @param queryIds prefix or suffix of files that should be deleted * @param deleteEmptyDirectories whether empty directories should be deleted */ - private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEnvironment, HdfsContext context, Path directory, Set queryIds, boolean deleteEmptyDirectories) + private static RecursiveDeleteResult recursiveDeleteFiles(TrinoFileSystem fileSystem, Location directory, Set queryIds, boolean deleteEmptyDirectories) { - FileSystem fileSystem; try { - fileSystem = hdfsEnvironment.getFileSystem(context, directory); - - if (!fileSystem.exists(directory)) { + if (!fileSystem.directoryExists(directory).orElse(false)) { return new RecursiveDeleteResult(true, ImmutableList.of()); } } @@ -2594,16 +2553,30 @@ private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEn return doRecursiveDeleteFiles(fileSystem, directory, queryIds, deleteEmptyDirectories); } - private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSystem, Path directory, Set queryIds, boolean deleteEmptyDirectories) + private static RecursiveDeleteResult doRecursiveDeleteFiles(TrinoFileSystem fileSystem, Location directory, Set queryIds, boolean deleteEmptyDirectories) { // don't delete hidden Trino directories use by FileHiveMetastore - if (directory.getName().startsWith(".trino")) { + directory = asFileLocation(directory); + if (directory.fileName().startsWith(".trino")) { return new RecursiveDeleteResult(false, ImmutableList.of()); } - FileStatus[] allFiles; + // TODO: this lists recursively but only uses the first level + List allFiles = new ArrayList<>(); + Set allDirectories; try { - allFiles = fileSystem.listStatus(directory); + FileIterator iterator = fileSystem.listFiles(directory); + while (iterator.hasNext()) { + Location location = iterator.next().location(); + String child = location.toString().substring(directory.toString().length()); + while (child.startsWith("/")) { + child = child.substring(1); + } + if (!child.contains("/")) { + allFiles.add(location); + } + } + allDirectories = fileSystem.listDirectories(directory); } catch (IOException e) { ImmutableList.Builder notDeletedItems = ImmutableList.builder(); @@ -2613,44 +2586,37 @@ private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSyste boolean allDescendentsDeleted = true; ImmutableList.Builder notDeletedEligibleItems = ImmutableList.builder(); - for (FileStatus fileStatus : allFiles) { - if (fileStatus.isFile()) { - Path filePath = fileStatus.getPath(); - String fileName = filePath.getName(); - boolean eligible = false; - // don't delete hidden Trino directories use by FileHiveMetastore - if (!fileName.startsWith(".trino")) { - eligible = queryIds.stream().anyMatch(id -> isFileCreatedByQuery(fileName, id)); - } - if (eligible) { - if (!deleteIfExists(fileSystem, filePath, false)) { - allDescendentsDeleted = false; - notDeletedEligibleItems.add(filePath.toString()); - } - } - else { - allDescendentsDeleted = false; - } - } - else if (fileStatus.isDirectory()) { - RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, fileStatus.getPath(), queryIds, deleteEmptyDirectories); - if (!subResult.isDirectoryNoLongerExists()) { + for (Location file : allFiles) { + String fileName = file.fileName(); + boolean eligible = false; + // don't delete hidden Trino directories use by FileHiveMetastore + if (!fileName.startsWith(".trino")) { + eligible = queryIds.stream().anyMatch(id -> isFileCreatedByQuery(fileName, id)); + } + if (eligible) { + if (!deleteFileIfExists(fileSystem, file)) { allDescendentsDeleted = false; - } - if (!subResult.getNotDeletedEligibleItems().isEmpty()) { - notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); + notDeletedEligibleItems.add(file.toString()); } } else { allDescendentsDeleted = false; - notDeletedEligibleItems.add(fileStatus.getPath().toString()); + } + } + for (Location file : allDirectories) { + RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, file, queryIds, deleteEmptyDirectories); + if (!subResult.isDirectoryNoLongerExists()) { + allDescendentsDeleted = false; + } + if (!subResult.getNotDeletedEligibleItems().isEmpty()) { + notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); } } // Unconditionally delete empty delta_ and delete_delta_ directories, because that's // what Hive does, and leaving them in place confuses delta file readers. - if (allDescendentsDeleted && (deleteEmptyDirectories || DELTA_DIRECTORY_MATCHER.matcher(directory.getName()).matches())) { + if (allDescendentsDeleted && (deleteEmptyDirectories || isDeltaDirectory(directory))) { verify(notDeletedEligibleItems.build().isEmpty()); - if (!deleteIfExists(fileSystem, directory, false)) { + if (!deleteEmptyDirectoryIfExists(fileSystem, directory)) { return new RecursiveDeleteResult(false, ImmutableList.of(directory + "/")); } return new RecursiveDeleteResult(true, ImmutableList.of()); @@ -2658,59 +2624,54 @@ else if (fileStatus.isDirectory()) { return new RecursiveDeleteResult(false, notDeletedEligibleItems.build()); } - /** - * Attempts to remove the file or empty directory. - * - * @return true if the location no longer exists - */ - private static boolean deleteIfExists(FileSystem fileSystem, Path path, boolean recursive) + private static boolean isDeltaDirectory(Location directory) { - try { - // attempt to delete the path - if (fileSystem.delete(path, recursive)) { - return true; - } + return DELTA_DIRECTORY_MATCHER.matcher(asFileLocation(directory).fileName()).matches(); + } - // delete failed - // check if path still exists - return !fileSystem.exists(path); + private static boolean deleteFileIfExists(TrinoFileSystem fileSystem, Location location) + { + try { + fileSystem.deleteFile(location); + return true; } - catch (FileNotFoundException ignored) { - // path was already removed or never existed + catch (FileNotFoundException e) { return true; } - catch (IOException ignored) { + catch (IOException e) { + return false; } - return false; } - /** - * Attempts to remove the file or empty directory. - * - * @return true if the location no longer exists - */ - private static boolean deleteRecursivelyIfExists(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) + private static boolean deleteEmptyDirectoryIfExists(TrinoFileSystem fileSystem, Location location) { - FileSystem fileSystem; try { - fileSystem = hdfsEnvironment.getFileSystem(context, path); + if (fileSystem.listFiles(location).hasNext()) { + log.warn("Not deleting non-empty directory: %s", location); + return false; + } + fileSystem.deleteDirectory(location); + return true; } - catch (IOException ignored) { - return false; + catch (IOException e) { + try { + return !fileSystem.directoryExists(location).orElse(false); + } + catch (IOException ex) { + return false; + } } - - return deleteIfExists(fileSystem, path, true); } - private static void renameDirectory(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path source, Path target, Runnable runWhenPathDoesntExist) + private static void renameDirectory(TrinoFileSystem fileSystem, Location source, Location target, Runnable runWhenPathDoesntExist) { - if (pathExists(context, hdfsEnvironment, target)) { - throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, - format("Unable to rename from %s to %s: target directory already exists", source, target)); + if (directoryExists(fileSystem, target)) { + throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Unable to rename from %s to %s: target directory already exists", source, target)); } - if (!pathExists(context, hdfsEnvironment, target.getParent())) { - createDirectory(context, hdfsEnvironment, target.getParent()); + Location parent = asFileLocation(target).parentDirectory(); + if (!directoryExists(fileSystem, parent)) { + createDirectory(fileSystem, parent); } // The runnable will assume that if rename fails, it will be okay to delete the directory (if the directory is empty). @@ -2718,15 +2679,33 @@ private static void renameDirectory(HdfsContext context, HdfsEnvironment hdfsEnv runWhenPathDoesntExist.run(); try { - if (!hdfsEnvironment.getFileSystem(context, source).rename(source, target)) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename %s to %s: rename returned false", source, target)); - } + fileSystem.renameDirectory(source, target); } catch (IOException e) { throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename %s to %s", source, target), e); } } + private static void createDirectory(TrinoFileSystem fileSystem, Location directory) + { + try { + fileSystem.createDirectory(directory); + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, e); + } + } + + private static boolean directoryExists(TrinoFileSystem fileSystem, Location directory) + { + try { + return fileSystem.directoryExists(directory).orElse(false); + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, e); + } + } + private static Optional getQueryId(Database database) { return Optional.ofNullable(database.getParameters().get(PRESTO_QUERY_ID_NAME)); @@ -2742,6 +2721,16 @@ private static Optional getQueryId(Partition partition) return Optional.ofNullable(partition.getParameters().get(PRESTO_QUERY_ID_NAME)); } + private static Location asFileLocation(Location location) + { + // TODO: this is to work around the file-only restriction of Location methods + String value = location.toString(); + while (value.endsWith("/")) { + value = value.substring(0, value.length() - 1); + } + return Location.of(value); + } + private void checkHoldsLock() { // This method serves a similar purpose at runtime as GuardedBy on method serves during static analysis. @@ -2781,10 +2770,10 @@ public static class Action { private final ActionType type; private final T data; - private final HdfsContext hdfsContext; + private final ConnectorIdentity identity; private final String queryId; - public Action(ActionType type, T data, HdfsContext hdfsContext, String queryId) + public Action(ActionType type, T data, ConnectorIdentity identity, String queryId) { this.type = requireNonNull(type, "type is null"); if (type == ActionType.DROP || type == ActionType.DROP_PRESERVE_DATA) { @@ -2794,7 +2783,7 @@ public Action(ActionType type, T data, HdfsContext hdfsContext, String queryId) requireNonNull(data, "data is null"); } this.data = data; - this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null"); + this.identity = requireNonNull(identity, "identity is null"); this.queryId = requireNonNull(queryId, "queryId is null"); } @@ -2809,9 +2798,9 @@ public T getData() return data; } - public HdfsContext getHdfsContext() + public ConnectorIdentity getIdentity() { - return hdfsContext; + return identity; } public String getQueryId() @@ -2834,7 +2823,7 @@ private static class TableAndMore { private final Table table; private final Optional principalPrivileges; - private final Optional currentLocation; // unpartitioned table only + private final Optional currentLocation; // unpartitioned table only private final Optional> fileNames; private final boolean ignoreExisting; private final PartitionStatistics statistics; @@ -2844,7 +2833,7 @@ private static class TableAndMore public TableAndMore( Table table, Optional principalPrivileges, - Optional currentLocation, + Optional currentLocation, Optional> fileNames, boolean ignoreExisting, PartitionStatistics statistics, @@ -2880,7 +2869,7 @@ public PrincipalPrivileges getPrincipalPrivileges() return principalPrivileges.get(); } - public Optional getCurrentLocation() + public Optional getCurrentLocation() { return currentLocation; } @@ -2927,7 +2916,7 @@ private static class TableAndMergeResults private final List partitionMergeResults; private final List partitions; - public TableAndMergeResults(Table table, Optional principalPrivileges, Optional currentLocation, List partitionMergeResults, List partitions) + public TableAndMergeResults(Table table, Optional principalPrivileges, Optional currentLocation, List partitionMergeResults, List partitions) { super(table, principalPrivileges, currentLocation, Optional.empty(), false, PartitionStatistics.empty(), PartitionStatistics.empty(), false); // retries are not supported for transactional tables this.partitionMergeResults = requireNonNull(partitionMergeResults, "partitionMergeResults is null"); @@ -3038,16 +3027,16 @@ private static class DeclaredIntentionToWrite { private final String declarationId; private final WriteMode mode; - private final HdfsContext hdfsContext; + private final ConnectorIdentity identity; private final String queryId; - private final Path rootPath; + private final Location rootPath; private final SchemaTableName schemaTableName; - public DeclaredIntentionToWrite(String declarationId, WriteMode mode, HdfsContext hdfsContext, String queryId, Path stagingPathRoot, SchemaTableName schemaTableName) + public DeclaredIntentionToWrite(String declarationId, WriteMode mode, ConnectorIdentity identity, String queryId, Location stagingPathRoot, SchemaTableName schemaTableName) { this.declarationId = requireNonNull(declarationId, "declarationId is null"); this.mode = requireNonNull(mode, "mode is null"); - this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null"); + this.identity = requireNonNull(identity, "identity is null"); this.queryId = requireNonNull(queryId, "queryId is null"); this.rootPath = requireNonNull(stagingPathRoot, "stagingPathRoot is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); @@ -3063,9 +3052,9 @@ public WriteMode getMode() return mode; } - public HdfsContext getHdfsContext() + public ConnectorIdentity getIdentity() { - return hdfsContext; + return identity; } public String getQueryId() @@ -3073,7 +3062,7 @@ public String getQueryId() return queryId; } - public Path getRootPath() + public Location getRootPath() { return rootPath; } @@ -3088,7 +3077,7 @@ public String toString() { return toStringHelper(this) .add("mode", mode) - .add("hdfsContext", hdfsContext) + .add("identity", identity) .add("queryId", queryId) .add("rootPath", rootPath) .add("schemaTableName", schemaTableName) @@ -3096,112 +3085,31 @@ public String toString() } } - private static class DirectoryCleanUpTask + private record DirectoryCleanUpTask(ConnectorIdentity identity, Location location, boolean deleteEmptyDirectory) { - private final HdfsContext context; - private final Path path; - private final boolean deleteEmptyDirectory; - - public DirectoryCleanUpTask(HdfsContext context, Path path, boolean deleteEmptyDirectory) - { - this.context = context; - this.path = path; - this.deleteEmptyDirectory = deleteEmptyDirectory; - } - - public HdfsContext getContext() + public DirectoryCleanUpTask { - return context; - } - - public Path getPath() - { - return path; - } - - public boolean isDeleteEmptyDirectory() - { - return deleteEmptyDirectory; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("context", context) - .add("path", path) - .add("deleteEmptyDirectory", deleteEmptyDirectory) - .toString(); + requireNonNull(identity, "identity is null"); + requireNonNull(location, "location is null"); } } - private static class DirectoryDeletionTask + private record DirectoryDeletionTask(ConnectorIdentity identity, Location location) { - private final HdfsContext context; - private final Path path; - - public DirectoryDeletionTask(HdfsContext context, Path path) - { - this.context = context; - this.path = path; - } - - public HdfsContext getContext() + public DirectoryDeletionTask { - return context; - } - - public Path getPath() - { - return path; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("context", context) - .add("path", path) - .toString(); + requireNonNull(identity, "identity is null"); + requireNonNull(location, "location is null"); } } - private static class DirectoryRenameTask + private record DirectoryRenameTask(ConnectorIdentity identity, Location renameFrom, Location renameTo) { - private final HdfsContext context; - private final Path renameFrom; - private final Path renameTo; - - public DirectoryRenameTask(HdfsContext context, Path renameFrom, Path renameTo) - { - this.context = requireNonNull(context, "context is null"); - this.renameFrom = requireNonNull(renameFrom, "renameFrom is null"); - this.renameTo = requireNonNull(renameTo, "renameTo is null"); - } - - public HdfsContext getContext() - { - return context; - } - - public Path getRenameFrom() - { - return renameFrom; - } - - public Path getRenameTo() - { - return renameTo; - } - - @Override - public String toString() + public DirectoryRenameTask { - return toStringHelper(this) - .add("context", context) - .add("renameFrom", renameFrom) - .add("renameTo", renameTo) - .toString(); + requireNonNull(identity, "identity is null"); + requireNonNull(renameFrom, "renameFrom is null"); + requireNonNull(renameTo, "renameTo is null"); } } @@ -3609,7 +3517,7 @@ public List getNotDeletedEligibleItems() private interface ExclusiveOperation { - void execute(HiveMetastoreClosure delegate, HdfsEnvironment hdfsEnvironment); + void execute(HiveMetastoreClosure delegate); } private long allocateWriteId(String dbName, String tableName, long transactionId) @@ -3649,50 +3557,27 @@ public void commitTransaction(long transactionId) delegate.commitTransaction(transactionId); } - public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String queryId, Location location, Set filesToKeep) + public static void cleanExtraOutputFiles(TrinoFileSystem fileSystem, String queryId, Location path, Set filesToKeep) { - Path path = new Path(location.toString()); - List filesToDelete = new LinkedList<>(); + List filesToDelete = new ArrayList<>(); try { - log.debug("Deleting failed attempt files from %s for query %s", path, queryId); - FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path); - if (!fileSystem.exists(path)) { - // directory may nat exit if no files were actually written - return; - } - - // files are written flat in a single directory so we do not need to list recursively - RemoteIterator iterator = fileSystem.listFiles(path, false); - while (iterator.hasNext()) { - Path file = iterator.next().getPath(); - if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) { - filesToDelete.add(file.getName()); + Failsafe.with(DELETE_RETRY_POLICY).run(() -> { + log.debug("Deleting failed attempt files from %s for query %s", path, queryId); + + filesToDelete.clear(); + FileIterator iterator = fileSystem.listFiles(path); + while (iterator.hasNext()) { + Location file = iterator.next().location(); + if (isFileCreatedByQuery(file.fileName(), queryId) && !filesToKeep.contains(file.fileName())) { + filesToDelete.add(file); + } } - } - - ImmutableList.Builder deletedFilesBuilder = ImmutableList.builder(); - Iterator filesToDeleteIterator = filesToDelete.iterator(); - while (filesToDeleteIterator.hasNext()) { - String fileName = filesToDeleteIterator.next(); - Path filePath = new Path(path, fileName); - log.debug("Deleting failed attempt file %s for query %s", filePath, queryId); - DELETE_RETRY.run("delete " + filePath, () -> { - checkedDelete(fileSystem, filePath, false); - return null; - }); - deletedFilesBuilder.add(fileName); - filesToDeleteIterator.remove(); - } - List deletedFiles = deletedFilesBuilder.build(); - if (!deletedFiles.isEmpty()) { - log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId); - } + log.debug("Found %s failed attempt file(s) to delete for query %s", filesToDelete.size(), queryId); + fileSystem.deleteFiles(filesToDelete); + }); } - catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } + catch (FailsafeException e) { // If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query. // Yet if we have problem here, probably rollback will also fail. // diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java index e0ac0f354589..902a8cd0d80b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java @@ -49,7 +49,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; -import java.io.FileNotFoundException; import java.io.IOException; import java.time.LocalDate; import java.time.LocalDateTime; @@ -381,22 +380,6 @@ public static void createDirectory(HdfsContext context, HdfsEnvironment hdfsEnvi } } - public static void checkedDelete(FileSystem fileSystem, Path file, boolean recursive) - throws IOException - { - try { - if (!fileSystem.delete(file, recursive)) { - if (fileSystem.exists(file)) { - // only throw exception if file still exists - throw new IOException("Failed to delete " + file); - } - } - } - catch (FileNotFoundException ignored) { - // ok - } - } - public static boolean isWritableType(HiveType hiveType) { return isWritableType(hiveType.getTypeInfo()); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveLocal.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveLocal.java index d00fe4350790..8576ef774ee4 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveLocal.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveLocal.java @@ -34,7 +34,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.PrincipalType; import io.trino.testing.MaterializedResult; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -45,8 +44,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.net.URI; import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.Optional; @@ -241,7 +240,7 @@ public void testSparkBucketedTableValidation() private void doTestSparkBucketedTableValidation(SchemaTableName tableName) throws Exception { - java.nio.file.Path externalLocation = copyResourceDirToTemporaryDirectory("spark_bucketed_nation"); + Path externalLocation = copyResourceDirToTemporaryDirectory("spark_bucketed_nation"); try { createExternalTable( tableName, @@ -257,7 +256,7 @@ private void doTestSparkBucketedTableValidation(SchemaTableName tableName) BUCKETING_V1, 3, ImmutableList.of(new SortingColumn("name", SortingColumn.Order.ASCENDING)))), - new Path(URI.create("file://" + externalLocation.toString()))); + Location.of(externalLocation.toUri().toString())); assertReadFailsWithMessageMatching(ORC, tableName, "Hive table is corrupt\\. File '.*/.*' is for bucket [0-2], but contains a row for bucket [0-2]."); markTableAsCreatedBySpark(tableName, "orc"); @@ -294,7 +293,7 @@ private void markTableAsCreatedBySpark(SchemaTableName tableName, String provide } } - private void createExternalTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List columns, List partitionColumns, Optional bucketProperty, Path externalLocation) + private void createExternalTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List columns, List partitionColumns, Optional bucketProperty, Location externalLocation) { try (Transaction transaction = newTransaction()) { ConnectorSession session = newSession(); @@ -327,17 +326,17 @@ private void createExternalTable(SchemaTableName schemaTableName, HiveStorageFor } } - private java.nio.file.Path copyResourceDirToTemporaryDirectory(String resourceName) + private Path copyResourceDirToTemporaryDirectory(String resourceName) throws IOException { - java.nio.file.Path tempDir = java.nio.file.Files.createTempDirectory(getClass().getSimpleName()).normalize(); + Path tempDir = java.nio.file.Files.createTempDirectory(getClass().getSimpleName()).normalize(); log.info("Copying resource dir '%s' to %s", resourceName, tempDir); ClassPath.from(getClass().getClassLoader()) .getResources().stream() .filter(resourceInfo -> resourceInfo.getResourceName().startsWith(resourceName)) .forEach(resourceInfo -> { try { - java.nio.file.Path target = tempDir.resolve(resourceInfo.getResourceName()); + Path target = tempDir.resolve(resourceInfo.getResourceName()); java.nio.file.Files.createDirectories(target.getParent()); try (InputStream inputStream = resourceInfo.asByteSource().openStream()) { copy(inputStream, target); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java index 44f121c74005..63f1346c9726 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestSemiTransactionalHiveMetastore.java @@ -35,7 +35,7 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.trino.plugin.hive.HiveBasicStatistics.createEmptyStatistics; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.acid.AcidOperation.INSERT; import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1; import static io.trino.testing.TestingConnectorSession.SESSION; @@ -78,7 +78,8 @@ public void testParallelPartitionDrops() private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithDropExecutor(Executor dropExecutor) { - return new SemiTransactionalHiveMetastore(HDFS_ENVIRONMENT, + return new SemiTransactionalHiveMetastore( + HDFS_FILE_SYSTEM_FACTORY, new HiveMetastoreClosure(new TestingHiveMetastore()), directExecutor(), dropExecutor, @@ -118,7 +119,8 @@ public void testParallelUpdateStatisticsOperations() private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithUpdateExecutor(Executor updateExecutor) { - return new SemiTransactionalHiveMetastore(HDFS_ENVIRONMENT, + return new SemiTransactionalHiveMetastore( + HDFS_FILE_SYSTEM_FACTORY, new HiveMetastoreClosure(new TestingHiveMetastore()), directExecutor(), directExecutor(),