From b5f81c4228fc5a5ceb3bf2e1bb1b1d2ec771380f Mon Sep 17 00:00:00 2001 From: manuzhang Date: Thu, 29 Aug 2024 23:35:39 +0800 Subject: [PATCH] Spark 3.3, Spark 3.4: Parallelize reading files in snapshot and migrate procedures Back-port of https://github.com/apache/iceberg/pull/10037 --- .../apache/iceberg/spark/SparkTableUtil.java | 139 +++++++++++++++++- .../actions/MigrateTableSparkAction.java | 11 +- .../actions/SnapshotTableSparkAction.java | 11 +- .../procedures/MigrateTableProcedure.java | 17 ++- .../procedures/SnapshotTableProcedure.java | 9 +- .../spark/actions/TestMigrateTableAction.java | 71 +++++++++ .../actions/TestSnapshotTableAction.java | 74 ++++++++++ .../apache/iceberg/spark/SparkTableUtil.java | 139 +++++++++++++++++- .../actions/MigrateTableSparkAction.java | 11 +- .../actions/SnapshotTableSparkAction.java | 11 +- .../procedures/MigrateTableProcedure.java | 10 +- .../procedures/SnapshotTableProcedure.java | 9 +- .../spark/actions/TestMigrateTableAction.java | 71 +++++++++ .../actions/TestSnapshotTableAction.java | 74 ++++++++++ 14 files changed, 635 insertions(+), 22 deletions(-) create mode 100644 spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java create mode 100644 spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java create mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java create mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index fb9f6325fab0..dfd0b58ffbee 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -327,6 +328,24 @@ public static List listPartition( parallelism); } + private static List listPartition( + SparkPartition partition, + PartitionSpec spec, + SerializableConfiguration conf, + MetricsConfig metricsConfig, + NameMapping mapping, + ExecutorService service) { + return TableMigrationUtil.listPartition( + partition.values, + partition.uri, + partition.format, + spec, + conf.get(), + metricsConfig, + mapping, + service); + } + private static SparkPartition toSparkPartition( CatalogTablePartition partition, CatalogTable table) { Option locationUri = partition.storage().locationUri(); @@ -425,6 +444,54 @@ public static void importSparkTable( spark, sourceTableIdent, targetTable, stagingDir, partitionFilter, checkDuplicateFiles, 1); } + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param parallelism number of threads to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param service executor service to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + ExecutorService service) { + importSparkTable( + spark, sourceTableIdent, targetTable, stagingDir, Collections.emptyMap(), false, service); + } + /** * Import files from an existing Spark table to an Iceberg table. * @@ -448,6 +515,39 @@ public static void importSparkTable( Map partitionFilter, boolean checkDuplicateFiles, int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + partitionFilter, + checkDuplicateFiles, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param partitionFilter only import partitions whose values match those in the map, can be + * partially defined + * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file + * @param service executor service to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + Map partitionFilter, + boolean checkDuplicateFiles, + ExecutorService service) { SessionCatalog catalog = spark.sessionState().catalog(); String db = @@ -468,7 +568,7 @@ public static void importSparkTable( if (Objects.equal(spec, PartitionSpec.unpartitioned())) { importUnpartitionedSparkTable( - spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, parallelism); + spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, service); } else { List sourceTablePartitions = getPartitions(spark, sourceTableIdent, partitionFilter); @@ -482,7 +582,7 @@ public static void importSparkTable( spec, stagingDir, checkDuplicateFiles, - parallelism); + service); } } } catch (AnalysisException e) { @@ -541,7 +641,7 @@ private static void importUnpartitionedSparkTable( TableIdentifier sourceTableIdent, Table targetTable, boolean checkDuplicateFiles, - int parallelism) { + ExecutorService service) { try { CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent); Option format = @@ -567,7 +667,7 @@ private static void importUnpartitionedSparkTable( conf, metricsConfig, nameMapping, - parallelism); + service); if (checkDuplicateFiles) { Dataset importedFiles = @@ -637,6 +737,35 @@ public static void importSparkPartitions( String stagingDir, boolean checkDuplicateFiles, int parallelism) { + importSparkPartitions( + spark, + partitions, + targetTable, + spec, + stagingDir, + checkDuplicateFiles, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from given partitions to an Iceberg table. + * + * @param spark a Spark session + * @param partitions partitions to import + * @param targetTable an Iceberg table where to import the data + * @param spec a partition spec + * @param stagingDir a staging directory to store temporary manifest files + * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file + * @param service executor service to use for file reading + */ + public static void importSparkPartitions( + SparkSession spark, + List partitions, + Table targetTable, + PartitionSpec spec, + String stagingDir, + boolean checkDuplicateFiles, + ExecutorService service) { Configuration conf = spark.sessionState().newHadoopConf(); SerializableConfiguration serializableConf = new SerializableConfiguration(conf); int listingParallelism = @@ -664,7 +793,7 @@ public static void importSparkPartitions( serializableConf, metricsConfig, nameMapping, - parallelism) + service) .iterator(), Encoders.javaSerialization(DataFile.class)); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index fe8acf0157d3..0eb2a99f2f49 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.actions; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; @@ -59,6 +60,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction 0, "Parallelism should be larger than 0"); + migrateTableSparkAction = + migrateTableSparkAction.executeWith(executorService(parallelism, "table-migration")); + } + + MigrateTable.Result result = migrateTableSparkAction.execute(); + return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())}; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java index 7a015a51e8ed..f709f64ebf62 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java @@ -38,7 +38,8 @@ class SnapshotTableProcedure extends BaseProcedure { ProcedureParameter.required("source_table", DataTypes.StringType), ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("location", DataTypes.StringType), - ProcedureParameter.optional("properties", STRING_MAP) + ProcedureParameter.optional("properties", STRING_MAP), + ProcedureParameter.optional("parallelism", DataTypes.IntegerType) }; private static final StructType OUTPUT_TYPE = @@ -102,6 +103,12 @@ public InternalRow[] call(InternalRow args) { action.tableLocation(snapshotLocation); } + if (!args.isNullAt(4)) { + int parallelism = args.getInt(4); + Preconditions.checkArgument(parallelism > 0, "Parallelism should be larger than 0"); + action = action.executeWith(executorService(parallelism, "table-snapshot")); + } + SnapshotTable.Result result = action.tableProperties(properties).execute(); return new InternalRow[] {newInternalRow(result.importedDataFilesCount())}; } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java new file mode 100644 index 000000000000..6989e2a15cb8 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestMigrateTableAction extends SparkCatalogTestBase { + private static final String TABLE_NAME = "spark_catalog.default.table"; + + public TestMigrateTableAction( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", TABLE_NAME); + sql("DROP TABLE IF EXISTS %s_BACKUP_", TABLE_NAME); + } + + @Test + public void testMigrateWithParallelTasks() throws IOException { + String location = temp.newFolder().toURI().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + TABLE_NAME, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", TABLE_NAME); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", TABLE_NAME); + + AtomicInteger migrationThreadsIndex = new AtomicInteger(0); + SparkActions.get() + .migrateTable(TABLE_NAME) + .executeWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-migration-" + migrationThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .execute(); + Assert.assertEquals(migrationThreadsIndex.get(), 2); + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java new file mode 100644 index 000000000000..412a73870d22 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestSnapshotTableAction extends SparkCatalogTestBase { + private static final String TABLE_NAME = "spark_catalog.default.table"; + private static final String SOURCE_NAME = "spark_catalog.default.source"; + + public TestSnapshotTableAction( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", TABLE_NAME); + sql("DROP TABLE IF EXISTS %s PURGE", SOURCE_NAME); + } + + @Test + public void testSnapshotWithParallelTasks() throws IOException { + String location = temp.newFolder().toURI().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + SOURCE_NAME, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME); + + AtomicInteger snapshotThreadsIndex = new AtomicInteger(0); + SparkActions.get() + .snapshotTable(SOURCE_NAME) + .as(TABLE_NAME) + .executeWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-snapshot-" + snapshotThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .execute(); + Assert.assertEquals(snapshotThreadsIndex.get(), 2); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 8b25d516a920..7a96e97fb98a 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -290,6 +291,24 @@ private static List listPartition( parallelism); } + private static List listPartition( + SparkPartition partition, + PartitionSpec spec, + SerializableConfiguration conf, + MetricsConfig metricsConfig, + NameMapping mapping, + ExecutorService service) { + return TableMigrationUtil.listPartition( + partition.values, + partition.uri, + partition.format, + spec, + conf.get(), + metricsConfig, + mapping, + service); + } + private static SparkPartition toSparkPartition( CatalogTablePartition partition, CatalogTable table) { Option locationUri = partition.storage().locationUri(); @@ -388,6 +407,54 @@ public static void importSparkTable( spark, sourceTableIdent, targetTable, stagingDir, partitionFilter, checkDuplicateFiles, 1); } + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param parallelism number of threads to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param service executor service to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + ExecutorService service) { + importSparkTable( + spark, sourceTableIdent, targetTable, stagingDir, Collections.emptyMap(), false, service); + } + /** * Import files from an existing Spark table to an Iceberg table. * @@ -411,6 +478,39 @@ public static void importSparkTable( Map partitionFilter, boolean checkDuplicateFiles, int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + partitionFilter, + checkDuplicateFiles, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param partitionFilter only import partitions whose values match those in the map, can be + * partially defined + * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file + * @param service executor service to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + Map partitionFilter, + boolean checkDuplicateFiles, + ExecutorService service) { SessionCatalog catalog = spark.sessionState().catalog(); String db = @@ -431,7 +531,7 @@ public static void importSparkTable( if (Objects.equal(spec, PartitionSpec.unpartitioned())) { importUnpartitionedSparkTable( - spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, parallelism); + spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles, service); } else { List sourceTablePartitions = getPartitions(spark, sourceTableIdent, partitionFilter); @@ -445,7 +545,7 @@ public static void importSparkTable( spec, stagingDir, checkDuplicateFiles, - parallelism); + service); } } } catch (AnalysisException e) { @@ -504,7 +604,7 @@ private static void importUnpartitionedSparkTable( TableIdentifier sourceTableIdent, Table targetTable, boolean checkDuplicateFiles, - int parallelism) { + ExecutorService service) { try { CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent); Option format = @@ -530,7 +630,7 @@ private static void importUnpartitionedSparkTable( conf, metricsConfig, nameMapping, - parallelism); + service); if (checkDuplicateFiles) { Dataset importedFiles = @@ -600,6 +700,35 @@ public static void importSparkPartitions( String stagingDir, boolean checkDuplicateFiles, int parallelism) { + importSparkPartitions( + spark, + partitions, + targetTable, + spec, + stagingDir, + checkDuplicateFiles, + TableMigrationUtil.migrationService(parallelism)); + } + + /** + * Import files from given partitions to an Iceberg table. + * + * @param spark a Spark session + * @param partitions partitions to import + * @param targetTable an Iceberg table where to import the data + * @param spec a partition spec + * @param stagingDir a staging directory to store temporary manifest files + * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file + * @param service executor service to use for file reading + */ + public static void importSparkPartitions( + SparkSession spark, + List partitions, + Table targetTable, + PartitionSpec spec, + String stagingDir, + boolean checkDuplicateFiles, + ExecutorService service) { Configuration conf = spark.sessionState().newHadoopConf(); SerializableConfiguration serializableConf = new SerializableConfiguration(conf); int listingParallelism = @@ -627,7 +756,7 @@ public static void importSparkPartitions( serializableConf, metricsConfig, nameMapping, - parallelism) + service) .iterator(), Encoders.javaSerialization(DataFile.class)); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index 5f3cdd3f035c..bdffeb465405 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.actions; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; @@ -59,6 +60,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction 0, "Parallelism should be larger than 0"); + migrateTableSparkAction = + migrateTableSparkAction.executeWith(executorService(parallelism, "table-migration")); + } + MigrateTable.Result result = migrateTableSparkAction.execute(); return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())}; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java index 7a015a51e8ed..f709f64ebf62 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java @@ -38,7 +38,8 @@ class SnapshotTableProcedure extends BaseProcedure { ProcedureParameter.required("source_table", DataTypes.StringType), ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("location", DataTypes.StringType), - ProcedureParameter.optional("properties", STRING_MAP) + ProcedureParameter.optional("properties", STRING_MAP), + ProcedureParameter.optional("parallelism", DataTypes.IntegerType) }; private static final StructType OUTPUT_TYPE = @@ -102,6 +103,12 @@ public InternalRow[] call(InternalRow args) { action.tableLocation(snapshotLocation); } + if (!args.isNullAt(4)) { + int parallelism = args.getInt(4); + Preconditions.checkArgument(parallelism > 0, "Parallelism should be larger than 0"); + action = action.executeWith(executorService(parallelism, "table-snapshot")); + } + SnapshotTable.Result result = action.tableProperties(properties).execute(); return new InternalRow[] {newInternalRow(result.importedDataFilesCount())}; } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java new file mode 100644 index 000000000000..6989e2a15cb8 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestMigrateTableAction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestMigrateTableAction extends SparkCatalogTestBase { + private static final String TABLE_NAME = "spark_catalog.default.table"; + + public TestMigrateTableAction( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", TABLE_NAME); + sql("DROP TABLE IF EXISTS %s_BACKUP_", TABLE_NAME); + } + + @Test + public void testMigrateWithParallelTasks() throws IOException { + String location = temp.newFolder().toURI().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + TABLE_NAME, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", TABLE_NAME); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", TABLE_NAME); + + AtomicInteger migrationThreadsIndex = new AtomicInteger(0); + SparkActions.get() + .migrateTable(TABLE_NAME) + .executeWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-migration-" + migrationThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .execute(); + Assert.assertEquals(migrationThreadsIndex.get(), 2); + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java new file mode 100644 index 000000000000..412a73870d22 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestSnapshotTableAction extends SparkCatalogTestBase { + private static final String TABLE_NAME = "spark_catalog.default.table"; + private static final String SOURCE_NAME = "spark_catalog.default.source"; + + public TestSnapshotTableAction( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", TABLE_NAME); + sql("DROP TABLE IF EXISTS %s PURGE", SOURCE_NAME); + } + + @Test + public void testSnapshotWithParallelTasks() throws IOException { + String location = temp.newFolder().toURI().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + SOURCE_NAME, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME); + + AtomicInteger snapshotThreadsIndex = new AtomicInteger(0); + SparkActions.get() + .snapshotTable(SOURCE_NAME) + .as(TABLE_NAME) + .executeWith( + Executors.newFixedThreadPool( + 4, + runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-snapshot-" + snapshotThreadsIndex.getAndIncrement()); + thread.setDaemon(true); + return thread; + })) + .execute(); + Assert.assertEquals(snapshotThreadsIndex.get(), 2); + } +}