Skip to content

Commit

Permalink
Spark 3.5: Parallelize reading files in snapshot and migrate procedur…
Browse files Browse the repository at this point in the history
…es (#10037)
  • Loading branch information
manuzhang authored Jun 26, 2024
1 parent c88e942 commit 87a988f
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 23 deletions.
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.concurrent.ExecutorService;

/** An action that migrates an existing table to Iceberg. */
public interface MigrateTable extends Action<MigrateTable, MigrateTable.Result> {
Expand Down Expand Up @@ -60,6 +61,17 @@ default MigrateTable backupTableName(String tableName) {
throw new UnsupportedOperationException("Backup table name cannot be specified");
}

/**
* Sets the executor service to use for parallel file reading. The default is not using executor
* service.
*
* @param service executor service
* @return this for method chaining
*/
default MigrateTable executeWith(ExecutorService service) {
throw new UnsupportedOperationException("Setting executor service is not supported");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of migrated data files. */
Expand Down
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.concurrent.ExecutorService;

/** An action that creates an independent snapshot of an existing table. */
public interface SnapshotTable extends Action<SnapshotTable, SnapshotTable.Result> {
Expand Down Expand Up @@ -57,6 +58,17 @@ public interface SnapshotTable extends Action<SnapshotTable, SnapshotTable.Resul
*/
SnapshotTable tableProperty(String key, String value);

/**
* Sets the executor service to use for parallel file reading. The default is not using executor
* service.
*
* @param service executor service
* @return this for method chaining
*/
default SnapshotTable executeWith(ExecutorService service) {
throw new UnsupportedOperationException("Setting executor service is not supported");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of imported data files. */
Expand Down
57 changes: 44 additions & 13 deletions data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -44,9 +42,8 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;

public class TableMigrationUtil {
private static final PathFilter HIDDEN_PATH_FILTER =
Expand Down Expand Up @@ -112,7 +109,46 @@ public static List<DataFile> listPartition(
MetricsConfig metricsSpec,
NameMapping mapping,
int parallelism) {
ExecutorService service = null;
return listPartition(
partition,
partitionUri,
format,
spec,
conf,
metricsSpec,
mapping,
migrationService(parallelism));
}

/**
* Returns the data files in a partition by listing the partition location. Metrics are read from
* the files and the file reading is done in parallel by a specified number of threads.
*
* <p>For Parquet and ORC partitions, this will read metrics from the file footer. For Avro
* partitions, metrics other than row count are set to null.
*
* <p>Note: certain metrics, like NaN counts, that are only supported by Iceberg file writers but
* not file footers, will not be populated.
*
* @param partition map of column names to column values for the partition
* @param partitionUri partition location URI
* @param format partition format, avro, parquet or orc
* @param spec a partition spec
* @param conf a Hadoop conf
* @param metricsSpec a metrics conf
* @param mapping a name mapping
* @param service executor service to use for file reading
* @return a List of DataFile
*/
public static List<DataFile> listPartition(
Map<String, String> partition,
String partitionUri,
String format,
PartitionSpec spec,
Configuration conf,
MetricsConfig metricsSpec,
NameMapping mapping,
ExecutorService service) {
try {
List<String> partitionValues =
spec.fields().stream()
Expand All @@ -130,8 +166,7 @@ public static List<DataFile> listPartition(
Tasks.Builder<Integer> task =
Tasks.range(fileStatus.size()).stopOnFailure().throwFailureWhenFinished();

if (parallelism > 1) {
service = migrationService(parallelism);
if (service != null) {
task.executeWith(service);
}

Expand Down Expand Up @@ -215,11 +250,7 @@ private static DataFile buildDataFile(
.build();
}

private static ExecutorService migrationService(int parallelism) {
return MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor)
Executors.newFixedThreadPool(
parallelism,
new ThreadFactoryBuilder().setNameFormat("table-migration-%d").build()));
public static ExecutorService migrationService(int parallelism) {
return parallelism == 1 ? null : ThreadPools.newWorkerPool("table-migration", parallelism);
}
}
4 changes: 3 additions & 1 deletion docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ See [`migrate`](#migrate) to replace an existing table with an Iceberg table.
| `table` | ✔️ | string | Name of the new Iceberg table to create |
| `location` | | string | Table location for the new table (delegated to the catalog by default) |
| `properties` || map<string, string> | Properties to add to the newly created table |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

#### Output

Expand Down Expand Up @@ -588,6 +589,7 @@ By default, the original table is retained with the name `table_BACKUP_`.
| `properties` || map<string, string> | Properties for the new Iceberg table |
| `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) |
| `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

#### Output

Expand Down Expand Up @@ -629,7 +631,7 @@ will then treat these files as if they are part of the set of files owned by Ic
| `source_table` | ✔️ | string | Table where files should come from, paths are also possible in the form of \`file_format\`.\`path\` |
| `partition_filter` || map<string, string> | A map of partitions in the source table to import from |
| `check_duplicate_files` || boolean | Whether to prevent files existing in the table from being added (defaults to true) |
| `parallelism` | | int | number of threads to use for file reading (defaults to 1) |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

Warning : Schema is not validated, adding files with different schema to the Iceberg table will cause issues.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -231,4 +232,45 @@ public void testMigrateEmptyTable() throws Exception {
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
assertThat(result).isEqualTo(0L);
}

@TestTemplate
public void testMigrateWithParallelism() throws IOException {
assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");

String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

List<Object[]> result =
sql("CALL %s.system.migrate(table => '%s', parallelism => %d)", catalogName, tableName, 2);
assertEquals("Procedure output must match", ImmutableList.of(row(2L)), result);

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, "a"), row(2L, "b")),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testMigrateWithInvalidParallelism() throws IOException {
assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");

String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

assertThatThrownBy(
() ->
sql(
"CALL %s.system.migrate(table => '%s', parallelism => %d)",
catalogName, tableName, -1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Parallelism should be larger than 0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -223,4 +224,42 @@ public void testInvalidSnapshotsCases() throws IOException {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
}

@TestTemplate
public void testSnapshotWithParallelism() throws IOException {
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
sourceName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);

List<Object[]> result =
sql(
"CALL %s.system.snapshot(source_table => '%s', table => '%s', parallelism => %d)",
catalogName, sourceName, tableName, 2);
assertEquals("Procedure output must match", ImmutableList.of(row(2L)), result);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, "a"), row(2L, "b")),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testSnapshotWithInvalidParallelism() throws IOException {
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
sourceName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);

assertThatThrownBy(
() ->
sql(
"CALL %s.system.snapshot(source_table => '%s', table => '%s', parallelism => %d)",
catalogName, sourceName, tableName, -1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Parallelism should be larger than 0");
}
}
Loading

0 comments on commit 87a988f

Please sign in to comment.