From 87a988fdda5fc68fd3b04947e000ef190fa54f61 Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Thu, 27 Jun 2024 00:28:00 +0800 Subject: [PATCH] Spark 3.5: Parallelize reading files in snapshot and migrate procedures (#10037) --- .../apache/iceberg/actions/MigrateTable.java | 12 ++ .../apache/iceberg/actions/SnapshotTable.java | 12 ++ .../iceberg/data/TableMigrationUtil.java | 57 +++++-- docs/docs/spark-procedures.md | 4 +- .../extensions/TestMigrateTableProcedure.java | 42 ++++++ .../TestSnapshotTableProcedure.java | 39 +++++ .../apache/iceberg/spark/SparkTableUtil.java | 139 +++++++++++++++++- .../actions/MigrateTableSparkAction.java | 11 +- .../actions/SnapshotTableSparkAction.java | 11 +- .../procedures/MigrateTableProcedure.java | 10 +- .../procedures/SnapshotTableProcedure.java | 9 +- .../spark/actions/TestMigrateTableAction.java | 68 +++++++++ .../actions/TestSnapshotTableAction.java | 68 +++++++++ 13 files changed, 459 insertions(+), 23 deletions(-) create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java index 5438c4b65a46..8f9e8d69c90a 100644 --- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; import java.util.Map; +import java.util.concurrent.ExecutorService; /** An action that migrates an existing table to Iceberg. */ public interface MigrateTable extends Action { @@ -60,6 +61,17 @@ default MigrateTable backupTableName(String tableName) { throw new UnsupportedOperationException("Backup table name cannot be specified"); } + /** + * Sets the executor service to use for parallel file reading. The default is not using executor + * service. + * + * @param service executor service + * @return this for method chaining + */ + default MigrateTable executeWith(ExecutorService service) { + throw new UnsupportedOperationException("Setting executor service is not supported"); + } + /** The action result that contains a summary of the execution. */ interface Result { /** Returns the number of migrated data files. */ diff --git a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java index 37c600ab0392..a28e94bcdbe5 100644 --- a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; import java.util.Map; +import java.util.concurrent.ExecutorService; /** An action that creates an independent snapshot of an existing table. */ public interface SnapshotTable extends Action { @@ -57,6 +58,17 @@ public interface SnapshotTable extends Action listPartition( MetricsConfig metricsSpec, NameMapping mapping, int parallelism) { - ExecutorService service = null; + return listPartition( + partition, + partitionUri, + format, + spec, + conf, + metricsSpec, + mapping, + migrationService(parallelism)); + } + + /** + * Returns the data files in a partition by listing the partition location. Metrics are read from + * the files and the file reading is done in parallel by a specified number of threads. + * + *

For Parquet and ORC partitions, this will read metrics from the file footer. For Avro + * partitions, metrics other than row count are set to null. + * + *

Note: certain metrics, like NaN counts, that are only supported by Iceberg file writers but + * not file footers, will not be populated. + * + * @param partition map of column names to column values for the partition + * @param partitionUri partition location URI + * @param format partition format, avro, parquet or orc + * @param spec a partition spec + * @param conf a Hadoop conf + * @param metricsSpec a metrics conf + * @param mapping a name mapping + * @param service executor service to use for file reading + * @return a List of DataFile + */ + public static List listPartition( + Map partition, + String partitionUri, + String format, + PartitionSpec spec, + Configuration conf, + MetricsConfig metricsSpec, + NameMapping mapping, + ExecutorService service) { try { List partitionValues = spec.fields().stream() @@ -130,8 +166,7 @@ public static List listPartition( Tasks.Builder task = Tasks.range(fileStatus.size()).stopOnFailure().throwFailureWhenFinished(); - if (parallelism > 1) { - service = migrationService(parallelism); + if (service != null) { task.executeWith(service); } @@ -215,11 +250,7 @@ private static DataFile buildDataFile( .build(); } - private static ExecutorService migrationService(int parallelism) { - return MoreExecutors.getExitingExecutorService( - (ThreadPoolExecutor) - Executors.newFixedThreadPool( - parallelism, - new ThreadFactoryBuilder().setNameFormat("table-migration-%d").build())); + public static ExecutorService migrationService(int parallelism) { + return parallelism == 1 ? null : ThreadPools.newWorkerPool("table-migration", parallelism); } } diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index dc439c04c855..31172fb531bd 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -546,6 +546,7 @@ See [`migrate`](#migrate) to replace an existing table with an Iceberg table. | `table` | ✔️ | string | Name of the new Iceberg table to create | | `location` | | string | Table location for the new table (delegated to the catalog by default) | | `properties` | ️ | map | Properties to add to the newly created table | +| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | #### Output @@ -588,6 +589,7 @@ By default, the original table is retained with the name `table_BACKUP_`. | `properties` | ️ | map | Properties for the new Iceberg table | | `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) | | `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) | +| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | #### Output @@ -629,7 +631,7 @@ will then treat these files as if they are part of the set of files owned by Ic | `source_table` | ✔️ | string | Table where files should come from, paths are also possible in the form of \`file_format\`.\`path\` | | `partition_filter` | ️ | map | A map of partitions in the source table to import from | | `check_duplicate_files` | ️ | boolean | Whether to prevent files existing in the table from being added (defaults to true) | -| `parallelism` | | int | number of threads to use for file reading (defaults to 1) | +| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | Warning : Schema is not validated, adding files with different schema to the Iceberg table will cause issues. diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 735a3bdee863..23c08b2572f4 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.file.Files; +import java.util.List; import java.util.Map; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; @@ -231,4 +232,45 @@ public void testMigrateEmptyTable() throws Exception { Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); assertThat(result).isEqualTo(0L); } + + @TestTemplate + public void testMigrateWithParallelism() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + List result = + sql("CALL %s.system.migrate(table => '%s', parallelism => %d)", catalogName, tableName, 2); + assertEquals("Procedure output must match", ImmutableList.of(row(2L)), result); + + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1L, "a"), row(2L, "b")), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @TestTemplate + public void testMigrateWithInvalidParallelism() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + assertThatThrownBy( + () -> + sql( + "CALL %s.system.migrate(table => '%s', parallelism => %d)", + catalogName, tableName, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Parallelism should be larger than 0"); + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java index 3b093947cacc..cb184043490c 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.file.Files; +import java.util.List; import java.util.Map; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; @@ -223,4 +224,42 @@ public void testInvalidSnapshotsCases() throws IOException { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Cannot handle an empty identifier for argument table"); } + + @TestTemplate + public void testSnapshotWithParallelism() throws IOException { + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + sourceName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName); + + List result = + sql( + "CALL %s.system.snapshot(source_table => '%s', table => '%s', parallelism => %d)", + catalogName, sourceName, tableName, 2); + assertEquals("Procedure output must match", ImmutableList.of(row(2L)), result); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1L, "a"), row(2L, "b")), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @TestTemplate + public void testSnapshotWithInvalidParallelism() throws IOException { + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + sourceName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName); + + assertThatThrownBy( + () -> + sql( + "CALL %s.system.snapshot(source_table => '%s', table => '%s', parallelism => %d)", + catalogName, sourceName, tableName, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Parallelism should be larger than 0"); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index eb27e1483d13..8447dbdcead1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -290,6 +291,24 @@ private static List listPartition( parallelism); } + private static List listPartition( + SparkPartition partition, + PartitionSpec spec, + SerializableConfiguration conf, + MetricsConfig metricsConfig, + NameMapping mapping, + ExecutorService service) { + return TableMigrationUtil.listPartition( + partition.values, + partition.uri, + partition.format, + spec, + conf.get(), + metricsConfig, + mapping, + service); + } + private static SparkPartition toSparkPartition( CatalogTablePartition partition, CatalogTable table) { Option locationUri = partition.storage().locationUri(); @@ -388,6 +407,54 @@ public static void importSparkTable( spark, sourceTableIdent, targetTable, stagingDir, partitionFilter, checkDuplicateFiles, 1); } + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param parallelism number of threads to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param service executor service to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + ExecutorService service) { + importSparkTable( + spark, sourceTableIdent, targetTable, stagingDir, Collections.emptyMap(), false, service); + } + /** * Import files from an existing Spark table to an Iceberg table. * @@ -411,6 +478,39 @@ public static void importSparkTable( Map partitionFilter, boolean checkDuplicateFiles, int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + partitionFilter, + checkDuplicateFiles, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param partitionFilter only import partitions whose values match those in the map, can be + * partially defined + * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file + * @param service executor service to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + Map partitionFilter, + boolean checkDuplicateFiles, + ExecutorService service) { SessionCatalog catalog = spark.sessionState().catalog(); String db = @@ -431,7 +531,7 @@ public static void importSparkTable( if (Objects.equal(spec, PartitionSpec.unpartitioned())) { importUnpartitionedSparkTable( - spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, parallelism); + spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, service); } else { List sourceTablePartitions = getPartitions(spark, sourceTableIdent, partitionFilter); @@ -445,7 +545,7 @@ public static void importSparkTable( spec, stagingDir, checkDuplicateFiles, - parallelism); + service); } } } catch (AnalysisException e) { @@ -504,7 +604,7 @@ private static void importUnpartitionedSparkTable( TableIdentifier sourceTableIdent, Table targetTable, boolean checkDuplicateFiles, - int parallelism) { + ExecutorService service) { try { CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent); Option format = @@ -530,7 +630,7 @@ private static void importUnpartitionedSparkTable( conf, metricsConfig, nameMapping, - parallelism); + service); if (checkDuplicateFiles) { Dataset importedFiles = @@ -600,6 +700,35 @@ public static void importSparkPartitions( String stagingDir, boolean checkDuplicateFiles, int parallelism) { + importSparkPartitions( + spark, + partitions, + targetTable, + spec, + stagingDir, + checkDuplicateFiles, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from given partitions to an Iceberg table. + * + * @param spark a Spark session + * @param partitions partitions to import + * @param targetTable an Iceberg table where to import the data + * @param spec a partition spec + * @param stagingDir a staging directory to store temporary manifest files + * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file + * @param service executor service to use for file reading + */ + public static void importSparkPartitions( + SparkSession spark, + List partitions, + Table targetTable, + PartitionSpec spec, + String stagingDir, + boolean checkDuplicateFiles, + ExecutorService service) { Configuration conf = spark.sessionState().newHadoopConf(); SerializableConfiguration serializableConf = new SerializableConfiguration(conf); int listingParallelism = @@ -627,7 +756,7 @@ public static void importSparkPartitions( serializableConf, metricsConfig, nameMapping, - parallelism) + service) .iterator(), Encoders.javaSerialization(DataFile.class)); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index 5f3cdd3f035c..bdffeb465405 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.actions; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; @@ -59,6 +60,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction 0, "Parallelism should be larger than 0"); + migrateTableSparkAction = + migrateTableSparkAction.executeWith(executorService(parallelism, "table-migration")); + } + MigrateTable.Result result = migrateTableSparkAction.execute(); return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())}; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java index 7a015a51e8ed..f709f64ebf62 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java @@ -38,7 +38,8 @@ class SnapshotTableProcedure extends BaseProcedure { ProcedureParameter.required("source_table", DataTypes.StringType), ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("location", DataTypes.StringType), - ProcedureParameter.optional("properties", STRING_MAP) + ProcedureParameter.optional("properties", STRING_MAP), + ProcedureParameter.optional("parallelism", DataTypes.IntegerType) }; private static final StructType OUTPUT_TYPE = @@ -102,6 +103,12 @@ public InternalRow[] call(InternalRow args) { action.tableLocation(snapshotLocation); } + if (!args.isNullAt(4)) { + int parallelism = args.getInt(4); + Preconditions.checkArgument(parallelism > 0, "Parallelism should be larger than 0"); + action = action.executeWith(executorService(parallelism, "table-snapshot")); + } + SnapshotTable.Result result = action.tableProperties(properties).execute(); return new InternalRow[] {newInternalRow(result.importedDataFilesCount())}; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java new file mode 100644 index 000000000000..94afa50cf4b8 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.spark.CatalogTestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestMigrateTableAction extends CatalogTestBase { + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName); + } + + @TestTemplate + public void testMigrateWithParallelTasks() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + AtomicInteger migrationThreadsIndex = new AtomicInteger(0); + SparkActions.get() + .migrateTable(tableName) + .executeWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-migration-" + migrationThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .execute(); + assertThat(migrationThreadsIndex.get()).isEqualTo(2); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java new file mode 100644 index 000000000000..3b6869c397a5 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.spark.CatalogTestBase; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSnapshotTableAction extends CatalogTestBase { + private static final String sourceName = "spark_catalog.default.source"; + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s PURGE", sourceName); + } + + @TestTemplate + public void testSnapshotWithParallelTasks() throws IOException { + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + sourceName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName); + + AtomicInteger snapshotThreadsIndex = new AtomicInteger(0); + SparkActions.get() + .snapshotTable(sourceName) + .as(tableName) + .executeWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-snapshot-" + snapshotThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .execute(); + assertThat(snapshotThreadsIndex.get()).isEqualTo(2); + } +}