From 9b2b93a085fe4d9ce02a375768b6eb428ba462f6 Mon Sep 17 00:00:00 2001 From: aokolnychyi Date: Tue, 31 Oct 2023 15:34:13 -0700 Subject: [PATCH] Core, Spark: Avoid manifest copies when importing data to V2 tables --- .../java/org/apache/iceberg/AppendFiles.java | 19 +- .../java/org/apache/iceberg/FastAppend.java | 10 +- .../iceberg/MergingSnapshotProducer.java | 9 +- .../org/apache/iceberg/TestFastAppend.java | 28 +- .../org/apache/iceberg/TestMergeAppend.java | 80 +++- .../org/apache/iceberg/TestTransaction.java | 7 +- docs/spark-procedures.md | 4 +- .../apache/iceberg/spark/SparkTableUtil.java | 6 +- .../extensions/TestAddFilesProcedure.java | 407 +++++++++--------- .../apache/iceberg/spark/SparkTableUtil.java | 6 +- .../spark/procedures/AddFilesProcedure.java | 15 +- .../extensions/TestAddFilesProcedure.java | 407 +++++++++--------- .../apache/iceberg/spark/SparkTableUtil.java | 6 +- .../spark/procedures/AddFilesProcedure.java | 15 +- .../extensions/TestAddFilesProcedure.java | 407 +++++++++--------- .../apache/iceberg/spark/SparkTableUtil.java | 6 +- .../spark/procedures/AddFilesProcedure.java | 15 +- 17 files changed, 766 insertions(+), 681 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java b/api/src/main/java/org/apache/iceberg/AppendFiles.java index 1fc249acf6ec..e36df7af20c9 100644 --- a/api/src/main/java/org/apache/iceberg/AppendFiles.java +++ b/api/src/main/java/org/apache/iceberg/AppendFiles.java @@ -42,16 +42,17 @@ public interface AppendFiles extends SnapshotUpdate { *

The manifest must contain only appended files. All files in the manifest will be appended to * the table in the snapshot created by this update. * - *

By default, the manifest will be rewritten to assign all entries this update's snapshot ID. - * In that case, it is always the responsibility of the caller to manage the lifecycle of the - * original manifest. + *

The manifest will be used directly if snapshot ID inheritance is enabled (all tables with + * the format version > 1 or if the inheritance is enabled explicitly via table properties). + * Otherwise, the manifest will be rewritten to assign all entries this update's snapshot ID. * - *

If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest - * should never be deleted manually if the commit succeeds as it will become part of the table - * metadata and will be cleaned up on expiry. If the manifest gets merged with others while - * preparing a new snapshot, it will be deleted automatically if this operation is successful. If - * the commit fails, the manifest will never be deleted and it is up to the caller whether to - * delete or reuse it. + *

If the manifest is rewritten, it is always the responsibility of the caller to manage the + * lifecycle of the original manifest. If manifest entries are allowed to inherit the snapshot ID + * assigned on commit, the manifest should never be deleted manually if the commit succeeds as it + * will become part of the table metadata and will be cleaned upon expiry. If the manifest gets + * merged with others while preparing a new snapshot, it will be deleted automatically if this + * operation is successful. If the commit fails, the manifest will never be deleted, and it is up + * to the caller whether to delete or reuse it. * * @param file a manifest file * @return this for method chaining diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 9619ec087ad6..734cab2efe46 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg; -import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; -import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -44,7 +41,6 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final String tableName; private final TableOperations ops; private final PartitionSpec spec; - private final boolean snapshotIdInheritanceEnabled; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); @@ -57,10 +53,6 @@ class FastAppend extends SnapshotProducer implements AppendFiles { this.tableName = tableName; this.ops = ops; this.spec = ops.current().spec(); - this.snapshotIdInheritanceEnabled = - ops.current() - .propertyAsBoolean( - SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); } @Override @@ -115,7 +107,7 @@ public FastAppend appendManifest(ManifestFile manifest) { Preconditions.checkArgument( manifest.sequenceNumber() == -1, "Sequence number must be assigned during commit"); - if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { + if (canInheritSnapshotId() && manifest.snapshotId() == null) { summaryBuilder.addedManifest(manifest); appendManifests.add(manifest); } else { diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 1dcfa6d3d41d..e06a4910986d 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -22,8 +22,6 @@ import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT; import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES; import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT; -import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; -import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; import java.io.IOException; import java.io.UncheckedIOException; @@ -80,7 +78,6 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final ManifestFilterManager filterManager; private final ManifestMergeManager deleteMergeManager; private final ManifestFilterManager deleteFilterManager; - private final boolean snapshotIdInheritanceEnabled; // update data private final List newDataFiles = Lists.newArrayList(); @@ -123,10 +120,6 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { this.deleteMergeManager = new DeleteFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled); this.deleteFilterManager = new DeleteFileFilterManager(); - this.snapshotIdInheritanceEnabled = - ops.current() - .propertyAsBoolean( - SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); } @Override @@ -271,7 +264,7 @@ private void setDataSpec(DataFile file) { protected void add(ManifestFile manifest) { Preconditions.checkArgument( manifest.content() == ManifestContent.DATA, "Cannot append delete manifest: %s", manifest); - if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { + if (canInheritSnapshotId() && manifest.snapshotId() == null) { appendedManifestsSummary.addedManifest(manifest); appendManifests.add(manifest); } else { diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index 6ed52fd5ad35..a871b4e00c24 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.assertj.core.api.Assertions; @@ -83,6 +84,13 @@ public void testEmptyTableAppendManifest() throws IOException { validateSnapshot(base.currentSnapshot(), snap, 1, FILE_A, FILE_B); + ManifestFile committedManifest = Iterables.getOnlyElement(snap.allManifests(FILE_IO)); + if (formatVersion == 1) { + Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path()); + } else { + Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path()); + } + // validate that the metadata summary is correct when using appendManifest Assert.assertEquals( "Summary metadata should include 2 added files", @@ -126,6 +134,12 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException { ids(commitId, commitId), files(FILE_A, FILE_B)); + if (formatVersion == 1) { + Assertions.assertThat(snap.allManifests(FILE_IO).get(1).path()).isNotEqualTo(manifest.path()); + } else { + Assertions.assertThat(snap.allManifests(FILE_IO).get(1).path()).isEqualTo(manifest.path()); + } + V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap.sequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); @@ -257,12 +271,21 @@ public void testAppendManifestCleanup() throws IOException { Snapshot pending = append.apply(); ManifestFile newManifest = pending.allManifests(FILE_IO).get(0); Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists()); + if (formatVersion == 1) { + Assertions.assertThat(newManifest.path()).isNotEqualTo(manifest.path()); + } else { + Assertions.assertThat(newManifest.path()).isEqualTo(manifest.path()); + } Assertions.assertThatThrownBy(append::commit) .isInstanceOf(CommitFailedException.class) .hasMessage("Injected failure"); - Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists()); + if (formatVersion == 1) { + Assertions.assertThat(new File(newManifest.path())).doesNotExist(); + } else { + Assertions.assertThat(new File(newManifest.path())).exists(); + } } @Test @@ -327,7 +350,8 @@ public void testAppendManifestWithSnapshotIdInheritance() throws IOException { Snapshot snapshot = table.currentSnapshot(); List manifests = table.currentSnapshot().allManifests(FILE_IO); - Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size()); + ManifestFile committedManifest = Iterables.getOnlyElement(manifests); + Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path()); validateManifestEntries( manifests.get(0), diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 892d92634c09..39c9ac4b6c21 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; @@ -107,14 +108,17 @@ public void testEmptyTableAppendManifest() throws IOException { "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); V2Assert.assertEquals( "Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber()); - Assert.assertEquals( - "Should create 1 manifest for initial write", - 1, - committedSnapshot.allManifests(table.io()).size()); + List manifests = committedSnapshot.allManifests(table.io()); + ManifestFile committedManifest = Iterables.getOnlyElement(manifests); + if (formatVersion == 1) { + Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path()); + } else { + Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path()); + } long snapshotId = committedSnapshot.snapshotId(); validateManifest( - committedSnapshot.allManifests(table.io()).get(0), + committedManifest, dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snapshotId, snapshotId), @@ -155,8 +159,17 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException { long snapshotId = committedSnapshot.snapshotId(); + ManifestFile committedManifest1 = committedSnapshot.allManifests(table.io()).get(0); + ManifestFile committedManifest2 = committedSnapshot.allManifests(table.io()).get(1); + + if (formatVersion == 1) { + Assertions.assertThat(committedManifest2.path()).isNotEqualTo(manifest.path()); + } else { + Assertions.assertThat(committedManifest2.path()).isEqualTo(manifest.path()); + } + validateManifest( - committedSnapshot.allManifests(table.io()).get(0), + committedManifest1, dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snapshotId, snapshotId), @@ -164,7 +177,7 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException { statuses(Status.ADDED, Status.ADDED)); validateManifest( - committedSnapshot.allManifests(table.io()).get(1), + committedManifest2, dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snapshotId, snapshotId), @@ -229,11 +242,13 @@ public void testMergeWithAppendFilesAndManifest() throws IOException { long snapshotId = committedSnapshot.snapshotId(); - Assert.assertEquals( - "Should create 1 merged manifest", 1, committedSnapshot.allManifests(table.io()).size()); + List manifests = committedSnapshot.allManifests(table.io()); + ManifestFile committedManifest = Iterables.getOnlyElement(manifests); + + Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path()); validateManifest( - committedSnapshot.allManifests(table.io()).get(0), + committedManifest, dataSeqs(1L, 1L, 1L, 1L), fileSeqs(1L, 1L, 1L, 1L), ids(snapshotId, snapshotId, snapshotId, snapshotId), @@ -316,7 +331,7 @@ public void testManifestMergeMinCount() throws IOException { Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); - ManifestFile manifest = writeManifest(FILE_A); + ManifestFile manifest = writeManifestWithName("FILE_A", FILE_A); ManifestFile manifest2 = writeManifestWithName("FILE_C", FILE_C); ManifestFile manifest3 = writeManifestWithName("FILE_D", FILE_D); Snapshot snap1 = @@ -355,6 +370,11 @@ public void testManifestMergeMinCount() throws IOException { files(FILE_C, FILE_D), statuses(Status.ADDED, Status.ADDED)); + // produce new manifests as the old ones could have been compacted + manifest = writeManifestWithName("FILE_A_S2", FILE_A); + manifest2 = writeManifestWithName("FILE_C_S2", FILE_C); + manifest3 = writeManifestWithName("FILE_D_S2", FILE_D); + Snapshot snap2 = commit( table, @@ -546,29 +566,41 @@ public void testManifestDoNotMergeMinCount() throws IOException { V2Assert.assertEquals( "Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber()); - Assert.assertEquals( - "Should contain 3 merged manifest after 1st write write", - 3, - committed.allManifests(table.io()).size()); + List manifests = committed.allManifests(table.io()); + Assertions.assertThat(manifests).hasSize(3); + + ManifestFile committedManifest = manifests.get(0); + ManifestFile committedManifest2 = manifests.get(1); + ManifestFile committedManifest3 = manifests.get(2); long snapshotId = committed.snapshotId(); + if (formatVersion == 1) { + Assertions.assertThat(committedManifest.path()).isNotEqualTo(manifest.path()); + Assertions.assertThat(committedManifest2.path()).isNotEqualTo(manifest2.path()); + Assertions.assertThat(committedManifest3.path()).isNotEqualTo(manifest3.path()); + } else { + Assertions.assertThat(committedManifest.path()).isEqualTo(manifest.path()); + Assertions.assertThat(committedManifest2.path()).isEqualTo(manifest2.path()); + Assertions.assertThat(committedManifest3.path()).isEqualTo(manifest3.path()); + } + validateManifest( - committed.allManifests(table.io()).get(0), + committedManifest, dataSeqs(1L, 1L), fileSeqs(1L, 1L), ids(snapshotId, snapshotId), files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED)); validateManifest( - committed.allManifests(table.io()).get(1), + committedManifest2, dataSeqs(1L), fileSeqs(1L), ids(snapshotId), files(FILE_C), statuses(Status.ADDED)); validateManifest( - committed.allManifests(table.io()).get(2), + committedManifest3, dataSeqs(1L), fileSeqs(1L), ids(snapshotId), @@ -981,6 +1013,11 @@ public void testAppendManifestCleanup() throws IOException { Snapshot pending = apply(append, branch); ManifestFile newManifest = pending.allManifests(table.io()).get(0); Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists()); + if (formatVersion == 1) { + Assertions.assertThat(newManifest.path()).isNotEqualTo(manifest.path()); + } else { + Assertions.assertThat(newManifest.path()).isEqualTo(manifest.path()); + } Assertions.assertThatThrownBy(() -> commit(table, append, branch)) .isInstanceOf(CommitFailedException.class) @@ -990,7 +1027,11 @@ public void testAppendManifestCleanup() throws IOException { V1Assert.assertEquals( "Table should end with last-sequence-number 0", 0, readMetadata().lastSequenceNumber()); - Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists()); + if (formatVersion == 1) { + Assertions.assertThat(new File(newManifest.path())).doesNotExist(); + } else { + Assertions.assertThat(new File(newManifest.path())).exists(); + } } @Test @@ -1084,6 +1125,7 @@ public void testAppendManifestWithSnapshotIdInheritance() throws IOException { List manifests = snapshot.allManifests(table.io()); Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size()); ManifestFile manifestFile = snapshot.allManifests(table.io()).get(0); + Assertions.assertThat(manifestFile.path()).isEqualTo(manifest.path()); validateManifest( manifestFile, dataSeqs(1L, 1L), diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index b76974e2440d..3b30758059e2 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -31,6 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -504,7 +505,11 @@ public void testTransactionRetryMergeCleanup() { } @Test - public void testTransactionRetryAndAppendManifests() throws Exception { + public void testTransactionRetryAndAppendManifestsWithoutSnapshotIdInheritance() + throws Exception { + // this test assumes append manifests are rewritten, which only happens in V1 tables + Assumptions.assumeThat(formatVersion).isEqualTo(1); + // use only one retry and aggressively merge manifests table .updateProperties() diff --git a/docs/spark-procedures.md b/docs/spark-procedures.md index 00cb91f0d131..3a637c665cab 100644 --- a/docs/spark-procedures.md +++ b/docs/spark-procedures.md @@ -649,10 +649,10 @@ Warning : Files added by this method can be physically deleted by Iceberg operat | Output Name | Type | Description | |---------------------------|------|---------------------------------------------------| | `added_files_count` | long | The number of files added by this command | -| `changed_partition_count` | long | The number of partitioned changed by this command | +| `changed_partition_count` | long | The number of partitioned changed by this command (if known) | {{< hint warning >}} -changed_partition_count will be 0 when table property `compatibility.snapshot-id-inheritance.enabled` is set to true or if the table format version is > 1. +changed_partition_count will be NULL when table property `compatibility.snapshot-id-inheritance.enabled` is set to true or if the table format version is > 1. {{< /hint >}} #### Examples diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index d1956322907b..38d51167e927 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -34,6 +34,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -42,6 +43,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; import org.apache.iceberg.hadoop.HadoopFileIO; @@ -632,6 +634,8 @@ public static void importSparkPartitions( .collectAsList(); try { + TableOperations ops = ((HasTableOperations) targetTable).operations(); + int formatVersion = ops.current().formatVersion(); boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean( targetTable.properties(), @@ -642,7 +646,7 @@ public static void importSparkPartitions( manifests.forEach(append::appendManifest); append.commit(); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete original manifests as they were rewritten before the commit deleteManifests(targetTable.io(), manifests); } diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index e02065bd6347..a48deae7421f 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.util.List; @@ -36,7 +38,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -53,15 +57,42 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runners.Parameterized.Parameters; public class TestAddFilesProcedure extends SparkExtensionsTestBase { + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + 1 + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + 2 + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + 2 + } + }; + } + + private final int formatVersion; private final String sourceTableName = "source_table"; private File fileTableDir; public TestAddFilesProcedure( - String catalogName, String implementation, Map config) { + String catalogName, String implementation, Map config, int formatVersion) { super(catalogName, implementation, config); + this.formatVersion = formatVersion; } @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -85,17 +116,14 @@ public void dropTables() { public void addDataUnpartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -107,10 +135,7 @@ public void addDataUnpartitioned() { public void deleteAndAddBackUnpartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", @@ -124,7 +149,7 @@ public void deleteAndAddBackUnpartitioned() { "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -183,14 +208,13 @@ public void addAvroFile() throws Exception { dataFileWriter.append(record2); dataFileWriter.close(); - String createIceberg = "CREATE TABLE %s (id Long, data String) USING iceberg"; - sql(createIceberg, tableName); + createIcebergTable("id Long, data String"); List result = sql( "CALL %s.system.add_files('%s', '`avro`.`%s`')", catalogName, tableName, outputFile.getPath()); - assertEquals("Procedure output must match", ImmutableList.of(row(1L, 1L)), result); + assertOutput(result, 1L, 1L); List expected = Lists.newArrayList(new Object[] {1L, "a"}, new Object[] {2L, "b"}); @@ -236,15 +260,12 @@ public void addDataUnpartitionedAvro() { public void addDataUnpartitionedHive() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -256,17 +277,14 @@ public void addDataUnpartitionedHive() { public void addDataUnpartitionedExtraCol() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String, foo string) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String, foo string"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -278,16 +296,14 @@ public void addDataUnpartitionedExtraCol() { public void addDataUnpartitionedMissingCol() { createUnpartitionedFileTable("parquet"); - String createIceberg = "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -299,17 +315,14 @@ public void addDataUnpartitionedMissingCol() { public void addDataPartitionedMissingCol() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -321,17 +334,15 @@ public void addDataPartitionedMissingCol() { public void addDataPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -388,15 +399,13 @@ public void addDataPartitionedAvro() { public void addDataPartitionedHive() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -408,17 +417,15 @@ public void addDataPartitionedHive() { public void addPartitionToPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -430,10 +437,8 @@ public void addPartitionToPartitioned() { public void deleteAndAddBackPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", @@ -447,7 +452,7 @@ public void deleteAndAddBackPartitioned() { "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -459,11 +464,11 @@ public void deleteAndAddBackPartitioned() { public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)" - + "TBLPROPERTIES ('%s'='true')"; - - sql(createIceberg, tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", + tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", @@ -491,17 +496,14 @@ public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { public void addDataPartitionedByDateToPartitioned() { createDatePartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, date Date) USING iceberg PARTITIONED BY (date)"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, date Date", "PARTITIONED BY (date)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -513,10 +515,8 @@ public void addDataPartitionedByDateToPartitioned() { public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { createTableWithTwoPartitions("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, date Date, dept String) USING iceberg PARTITIONED BY (date, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, date Date, dept String", "PARTITIONED BY (date, dept)"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", @@ -534,18 +534,15 @@ public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { public void addFilteredPartitionsToPartitioned() { createCompositePartitionedTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -557,18 +554,15 @@ public void addFilteredPartitionsToPartitioned() { public void addFilteredPartitionsToPartitioned2() { createCompositePartitionedTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('dept', 'hr'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(6L, 3L)), result); + assertOutput(result, 6L, 3L); assertEquals( "Iceberg table contains correct data", @@ -582,18 +576,15 @@ public void addFilteredPartitionsToPartitioned2() { public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -605,18 +596,15 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('dept', 'hr'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(6L, 3L)), result); + assertOutput(result, 6L, 3L); assertEquals( "Iceberg table contains correct data", @@ -630,18 +618,15 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { public void addWeirdCaseHiveTable() { createWeirdCaseTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, `naMe` String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (`naMe`)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, `naMe` String, dept String, subdept String", "PARTITIONED BY (`naMe`)"); List result = sql( "CALL %s.system.add_files('%s', '%s', map('naMe', 'John Doe'))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); /* While we would like to use @@ -681,17 +666,15 @@ public void addWeirdCaseHiveTable() { public void addPartitionToPartitionedHive() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '%s', map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -703,10 +686,7 @@ public void addPartitionToPartitionedHive() { public void invalidDataImport() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); AssertHelpers.assertThrows( "Should forbid adding of partitioned data to unpartitioned table", @@ -731,10 +711,8 @@ public void invalidDataImport() { public void invalidDataImportPartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); AssertHelpers.assertThrows( "Should forbid adding with a mismatching partition spec", @@ -759,10 +737,8 @@ public void invalidDataImportPartitioned() { public void addTwice() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result1 = sql( @@ -771,7 +747,7 @@ public void addTwice() { + "source_table => '%s', " + "partition_filter => map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -780,7 +756,7 @@ public void addTwice() { + "source_table => '%s', " + "partition_filter => map('id', 2))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -796,10 +772,8 @@ public void addTwice() { public void duplicateDataPartitioned() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); sql( "CALL %s.system.add_files(" @@ -826,10 +800,8 @@ public void duplicateDataPartitioned() { public void duplicateDataPartitionedAllowed() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result1 = sql( @@ -839,7 +811,7 @@ public void duplicateDataPartitionedAllowed() { + "partition_filter => map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -850,7 +822,7 @@ public void duplicateDataPartitionedAllowed() { + "check_duplicate_files => false)", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -865,10 +837,7 @@ public void duplicateDataPartitionedAllowed() { public void duplicateDataUnpartitioned() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); @@ -886,14 +855,11 @@ public void duplicateDataUnpartitioned() { public void duplicateDataUnpartitionedAllowed() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result1 = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -902,7 +868,7 @@ public void duplicateDataUnpartitionedAllowed() { + "source_table => '%s'," + "check_duplicate_files => false)", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -914,17 +880,14 @@ public void duplicateDataUnpartitionedAllowed() { @Test public void testEmptyImportDoesNotThrow() { - - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); // Empty path based import List pathResult = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), pathResult); + assertOutput(pathResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty path", emptyQueryResult, @@ -937,7 +900,7 @@ public void testEmptyImportDoesNotThrow() { List tableResult = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), tableResult); + assertOutput(tableResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty table", emptyQueryResult, @@ -954,10 +917,8 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { "ALTER TABLE %s ADD PARTITION (id = '%d') LOCATION '%d'", sourceTableName, emptyPartitionId, emptyPartitionId); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List tableResult = sql( @@ -967,7 +928,7 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { + "partition_filter => map('id', %d))", catalogName, tableName, sourceTableName, emptyPartitionId); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), tableResult); + assertOutput(tableResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty table", emptyQueryResult, @@ -983,38 +944,45 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { new StructField("subdept", DataTypes.StringType, true, Metadata.empty()) }; - private static final Dataset unpartitionedDF = - spark - .createDataFrame( - ImmutableList.of( - RowFactory.create(1, "John Doe", "hr", "communications"), - RowFactory.create(2, "Jane Doe", "hr", "salary"), - RowFactory.create(3, "Matt Doe", "hr", "communications"), - RowFactory.create(4, "Will Doe", "facilities", "all")), - new StructType(struct)) - .repartition(1); - - private static final Dataset singleNullRecordDF = - spark - .createDataFrame( - ImmutableList.of(RowFactory.create(null, null, null, null)), new StructType(struct)) - .repartition(1); - - private static final Dataset partitionedDF = - unpartitionedDF.select("name", "dept", "subdept", "id"); - - private static final Dataset compositePartitionedDF = - unpartitionedDF.select("name", "subdept", "id", "dept"); - - private static final Dataset compositePartitionedNullRecordDF = - singleNullRecordDF.select("name", "subdept", "id", "dept"); - - private static final Dataset weirdColumnNamesDF = - unpartitionedDF.select( - unpartitionedDF.col("id"), - unpartitionedDF.col("subdept"), - unpartitionedDF.col("dept"), - unpartitionedDF.col("name").as("naMe")); + private Dataset unpartitionedDF() { + return spark + .createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", "hr", "communications"), + RowFactory.create(2, "Jane Doe", "hr", "salary"), + RowFactory.create(3, "Matt Doe", "hr", "communications"), + RowFactory.create(4, "Will Doe", "facilities", "all")), + new StructType(struct)) + .repartition(1); + } + + private Dataset singleNullRecordDF() { + return spark + .createDataFrame( + ImmutableList.of(RowFactory.create(null, null, null, null)), new StructType(struct)) + .repartition(1); + } + + private Dataset partitionedDF() { + return unpartitionedDF().select("name", "dept", "subdept", "id"); + } + + private Dataset compositePartitionedDF() { + return unpartitionedDF().select("name", "subdept", "id", "dept"); + } + + private Dataset compositePartitionedNullRecordDF() { + return singleNullRecordDF().select("name", "subdept", "id", "dept"); + } + + private Dataset weirdColumnNamesDF() { + Dataset unpartitionedDF = unpartitionedDF(); + return unpartitionedDF.select( + unpartitionedDF.col("id"), + unpartitionedDF.col("subdept"), + unpartitionedDF.col("dept"), + unpartitionedDF.col("name").as("naMe")); + } private static final StructField[] dateStruct = { new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), @@ -1027,24 +995,26 @@ private static java.sql.Date toDate(String value) { return new java.sql.Date(DateTime.parse(value).getMillis()); } - private static final Dataset dateDF = - spark - .createDataFrame( - ImmutableList.of( - RowFactory.create(1, "John Doe", toDate("2021-01-01"), "01"), - RowFactory.create(2, "Jane Doe", toDate("2021-01-01"), "01"), - RowFactory.create(3, "Matt Doe", toDate("2021-01-02"), "02"), - RowFactory.create(4, "Will Doe", toDate("2021-01-02"), "02")), - new StructType(dateStruct)) - .repartition(2); + private Dataset dateDF() { + return spark + .createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", toDate("2021-01-01"), "01"), + RowFactory.create(2, "Jane Doe", toDate("2021-01-01"), "01"), + RowFactory.create(3, "Matt Doe", toDate("2021-01-02"), "02"), + RowFactory.create(4, "Will Doe", toDate("2021-01-02"), "02")), + new StructType(dateStruct)) + .repartition(2); + } private void createUnpartitionedFileTable(String format) { String createParquet = "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING %s LOCATION '%s'"; sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - unpartitionedDF.write().insertInto(sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); + Dataset df = unpartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createPartitionedFileTable(String format) { @@ -1054,8 +1024,9 @@ private void createPartitionedFileTable(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - partitionedDF.write().insertInto(sourceTableName); - partitionedDF.write().insertInto(sourceTableName); + Dataset df = partitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createCompositePartitionedTable(String format) { @@ -1064,8 +1035,9 @@ private void createCompositePartitionedTable(String format) { + "PARTITIONED BY (id, dept) LOCATION '%s'"; sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - compositePartitionedDF.write().insertInto(sourceTableName); - compositePartitionedDF.write().insertInto(sourceTableName); + Dataset df = compositePartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createCompositePartitionedTableWithNullValueInPartitionColumn(String format) { @@ -1075,8 +1047,8 @@ private void createCompositePartitionedTableWithNullValueInPartitionColumn(Strin sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); Dataset unionedDF = - compositePartitionedDF - .unionAll(compositePartitionedNullRecordDF) + compositePartitionedDF() + .unionAll(compositePartitionedNullRecordDF()) .select("name", "subdept", "id", "dept") .repartition(1); @@ -1091,8 +1063,9 @@ private void createWeirdCaseTable() { sql(createParquet, sourceTableName); - weirdColumnNamesDF.write().insertInto(sourceTableName); - weirdColumnNamesDF.write().insertInto(sourceTableName); + Dataset df = weirdColumnNamesDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createUnpartitionedHiveTable() { @@ -1101,8 +1074,9 @@ private void createUnpartitionedHiveTable() { sql(createHive, sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); + Dataset df = unpartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createPartitionedHiveTable() { @@ -1112,8 +1086,9 @@ private void createPartitionedHiveTable() { sql(createHive, sourceTableName); - partitionedDF.write().insertInto(sourceTableName); - partitionedDF.write().insertInto(sourceTableName); + Dataset df = partitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createDatePartitionedFileTable(String format) { @@ -1123,7 +1098,7 @@ private void createDatePartitionedFileTable(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - dateDF.select("id", "name", "ts").write().insertInto(sourceTableName); + dateDF().select("id", "name", "ts").write().insertInto(sourceTableName); } private void createTableWithTwoPartitions(String format) { @@ -1133,6 +1108,28 @@ private void createTableWithTwoPartitions(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - dateDF.write().insertInto(sourceTableName); + dateDF().write().insertInto(sourceTableName); + } + + private void createIcebergTable(String schema) { + createIcebergTable(schema, ""); + } + + private void createIcebergTable(String schema, String partitioning) { + sql( + "CREATE TABLE %s (%s) USING iceberg %s TBLPROPERTIES ('%s' '%d')", + tableName, schema, partitioning, TableProperties.FORMAT_VERSION, formatVersion); + } + + private void assertOutput( + List result, long expectedAddedFilesCount, long expectedChangedPartitionCount) { + Object[] output = Iterables.getOnlyElement(result); + assertThat(output[0]).isEqualTo(expectedAddedFilesCount); + if (formatVersion == 1) { + assertThat(output[1]).isEqualTo(expectedChangedPartitionCount); + } else { + // the number of changed partitions may not be populated in v2 tables + assertThat(output[1]).isIn(expectedChangedPartitionCount, null); + } } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 37f3f0c6c48d..6cb31a3b8830 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -34,6 +34,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -42,6 +43,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; import org.apache.iceberg.exceptions.ValidationException; @@ -633,6 +635,8 @@ public static void importSparkPartitions( .collectAsList(); try { + TableOperations ops = ((HasTableOperations) targetTable).operations(); + int formatVersion = ops.current().formatVersion(); boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean( targetTable.properties(), @@ -643,7 +647,7 @@ public static void importSparkPartitions( manifests.forEach(append::appendManifest); append.commit(); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete original manifests as they were rewritten before the commit deleteManifests(targetTable.io(), manifests); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java index b349694130d3..7ed239149d1c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.connector.catalog.CatalogPlugin; @@ -71,7 +72,7 @@ class AddFilesProcedure extends BaseProcedure { new StructType( new StructField[] { new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()), - new StructField("changed_partition_count", DataTypes.LongType, false, Metadata.empty()), + new StructField("changed_partition_count", DataTypes.LongType, true, Metadata.empty()), }); private AddFilesProcedure(TableCatalog tableCatalog) { @@ -117,12 +118,18 @@ public InternalRow[] call(InternalRow args) { private InternalRow[] toOutputRows(Snapshot snapshot) { Map summary = snapshot.summary(); return new InternalRow[] { - newInternalRow( - Long.parseLong(summary.getOrDefault(SnapshotSummary.ADDED_FILES_PROP, "0")), - Long.parseLong(summary.getOrDefault(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "0"))) + newInternalRow(addedFilesCount(summary), changedPartitionCount(summary)) }; } + private long addedFilesCount(Map stats) { + return PropertyUtil.propertyAsLong(stats, SnapshotSummary.ADDED_FILES_PROP, 0L); + } + + private Long changedPartitionCount(Map stats) { + return PropertyUtil.propertyAsNullableLong(stats, SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + } + private boolean isFileIdentifier(Identifier ident) { String[] namespace = ident.namespace(); return namespace.length == 1 diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index db8ffe07b8b7..cfb8a9f999f9 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.util.List; @@ -35,7 +37,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -53,15 +57,42 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runners.Parameterized.Parameters; public class TestAddFilesProcedure extends SparkExtensionsTestBase { + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + 1 + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + 2 + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + 2 + } + }; + } + + private final int formatVersion; private final String sourceTableName = "source_table"; private File fileTableDir; public TestAddFilesProcedure( - String catalogName, String implementation, Map config) { + String catalogName, String implementation, Map config, int formatVersion) { super(catalogName, implementation, config); + this.formatVersion = formatVersion; } @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -85,17 +116,14 @@ public void dropTables() { public void addDataUnpartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -107,10 +135,7 @@ public void addDataUnpartitioned() { public void deleteAndAddBackUnpartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", @@ -124,7 +149,7 @@ public void deleteAndAddBackUnpartitioned() { "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -183,14 +208,13 @@ public void addAvroFile() throws Exception { dataFileWriter.append(record2); dataFileWriter.close(); - String createIceberg = "CREATE TABLE %s (id Long, data String) USING iceberg"; - sql(createIceberg, tableName); + createIcebergTable("id Long, data String"); List result = sql( "CALL %s.system.add_files('%s', '`avro`.`%s`')", catalogName, tableName, outputFile.getPath()); - assertEquals("Procedure output must match", ImmutableList.of(row(1L, 1L)), result); + assertOutput(result, 1L, 1L); List expected = Lists.newArrayList(new Object[] {1L, "a"}, new Object[] {2L, "b"}); @@ -236,15 +260,12 @@ public void addDataUnpartitionedAvro() { public void addDataUnpartitionedHive() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -256,17 +277,14 @@ public void addDataUnpartitionedHive() { public void addDataUnpartitionedExtraCol() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String, foo string) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String, foo string"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -278,16 +296,14 @@ public void addDataUnpartitionedExtraCol() { public void addDataUnpartitionedMissingCol() { createUnpartitionedFileTable("parquet"); - String createIceberg = "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -299,17 +315,14 @@ public void addDataUnpartitionedMissingCol() { public void addDataPartitionedMissingCol() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -321,17 +334,15 @@ public void addDataPartitionedMissingCol() { public void addDataPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -388,15 +399,13 @@ public void addDataPartitionedAvro() { public void addDataPartitionedHive() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -408,17 +417,15 @@ public void addDataPartitionedHive() { public void addPartitionToPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -430,10 +437,8 @@ public void addPartitionToPartitioned() { public void deleteAndAddBackPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", @@ -447,7 +452,7 @@ public void deleteAndAddBackPartitioned() { "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -459,11 +464,11 @@ public void deleteAndAddBackPartitioned() { public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)" - + "TBLPROPERTIES ('%s'='true')"; - - sql(createIceberg, tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", + tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", @@ -491,17 +496,14 @@ public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { public void addDataPartitionedByDateToPartitioned() { createDatePartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, date Date) USING iceberg PARTITIONED BY (date)"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, date Date", "PARTITIONED BY (date)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -513,10 +515,8 @@ public void addDataPartitionedByDateToPartitioned() { public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { createTableWithTwoPartitions("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, date Date, dept String) USING iceberg PARTITIONED BY (date, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, date Date, dept String", "PARTITIONED BY (date, dept)"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", @@ -534,18 +534,15 @@ public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { public void addFilteredPartitionsToPartitioned() { createCompositePartitionedTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -557,18 +554,15 @@ public void addFilteredPartitionsToPartitioned() { public void addFilteredPartitionsToPartitioned2() { createCompositePartitionedTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('dept', 'hr'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(6L, 3L)), result); + assertOutput(result, 6L, 3L); assertEquals( "Iceberg table contains correct data", @@ -582,18 +576,15 @@ public void addFilteredPartitionsToPartitioned2() { public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -605,18 +596,15 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('dept', 'hr'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(6L, 3L)), result); + assertOutput(result, 6L, 3L); assertEquals( "Iceberg table contains correct data", @@ -630,18 +618,15 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { public void addWeirdCaseHiveTable() { createWeirdCaseTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, `naMe` String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (`naMe`)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, `naMe` String, dept String, subdept String", "PARTITIONED BY (`naMe`)"); List result = sql( "CALL %s.system.add_files('%s', '%s', map('naMe', 'John Doe'))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); /* While we would like to use @@ -681,17 +666,15 @@ public void addWeirdCaseHiveTable() { public void addPartitionToPartitionedHive() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '%s', map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -703,10 +686,7 @@ public void addPartitionToPartitionedHive() { public void invalidDataImport() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); Assertions.assertThatThrownBy( () -> @@ -729,10 +709,8 @@ public void invalidDataImport() { public void invalidDataImportPartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); Assertions.assertThatThrownBy( () -> @@ -758,10 +736,8 @@ public void invalidDataImportPartitioned() { public void addTwice() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result1 = sql( @@ -770,7 +746,7 @@ public void addTwice() { + "source_table => '%s', " + "partition_filter => map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -779,7 +755,7 @@ public void addTwice() { + "source_table => '%s', " + "partition_filter => map('id', 2))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -795,10 +771,8 @@ public void addTwice() { public void duplicateDataPartitioned() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); sql( "CALL %s.system.add_files(" @@ -825,10 +799,8 @@ public void duplicateDataPartitioned() { public void duplicateDataPartitionedAllowed() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result1 = sql( @@ -838,7 +810,7 @@ public void duplicateDataPartitionedAllowed() { + "partition_filter => map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -849,7 +821,7 @@ public void duplicateDataPartitionedAllowed() { + "check_duplicate_files => false)", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -864,10 +836,7 @@ public void duplicateDataPartitionedAllowed() { public void duplicateDataUnpartitioned() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); @@ -886,14 +855,11 @@ public void duplicateDataUnpartitioned() { public void duplicateDataUnpartitionedAllowed() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result1 = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -902,7 +868,7 @@ public void duplicateDataUnpartitionedAllowed() { + "source_table => '%s'," + "check_duplicate_files => false)", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -914,17 +880,14 @@ public void duplicateDataUnpartitionedAllowed() { @Test public void testEmptyImportDoesNotThrow() { - - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); // Empty path based import List pathResult = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), pathResult); + assertOutput(pathResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty path", emptyQueryResult, @@ -937,7 +900,7 @@ public void testEmptyImportDoesNotThrow() { List tableResult = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), tableResult); + assertOutput(tableResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty table", emptyQueryResult, @@ -954,10 +917,8 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { "ALTER TABLE %s ADD PARTITION (id = '%d') LOCATION '%d'", sourceTableName, emptyPartitionId, emptyPartitionId); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List tableResult = sql( @@ -967,7 +928,7 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { + "partition_filter => map('id', %d))", catalogName, tableName, sourceTableName, emptyPartitionId); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), tableResult); + assertOutput(tableResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty table", emptyQueryResult, @@ -983,38 +944,45 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { new StructField("subdept", DataTypes.StringType, true, Metadata.empty()) }; - private static final Dataset unpartitionedDF = - spark - .createDataFrame( - ImmutableList.of( - RowFactory.create(1, "John Doe", "hr", "communications"), - RowFactory.create(2, "Jane Doe", "hr", "salary"), - RowFactory.create(3, "Matt Doe", "hr", "communications"), - RowFactory.create(4, "Will Doe", "facilities", "all")), - new StructType(struct)) - .repartition(1); - - private static final Dataset singleNullRecordDF = - spark - .createDataFrame( - ImmutableList.of(RowFactory.create(null, null, null, null)), new StructType(struct)) - .repartition(1); - - private static final Dataset partitionedDF = - unpartitionedDF.select("name", "dept", "subdept", "id"); - - private static final Dataset compositePartitionedDF = - unpartitionedDF.select("name", "subdept", "id", "dept"); - - private static final Dataset compositePartitionedNullRecordDF = - singleNullRecordDF.select("name", "subdept", "id", "dept"); - - private static final Dataset weirdColumnNamesDF = - unpartitionedDF.select( - unpartitionedDF.col("id"), - unpartitionedDF.col("subdept"), - unpartitionedDF.col("dept"), - unpartitionedDF.col("name").as("naMe")); + private Dataset unpartitionedDF() { + return spark + .createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", "hr", "communications"), + RowFactory.create(2, "Jane Doe", "hr", "salary"), + RowFactory.create(3, "Matt Doe", "hr", "communications"), + RowFactory.create(4, "Will Doe", "facilities", "all")), + new StructType(struct)) + .repartition(1); + } + + private Dataset singleNullRecordDF() { + return spark + .createDataFrame( + ImmutableList.of(RowFactory.create(null, null, null, null)), new StructType(struct)) + .repartition(1); + } + + private Dataset partitionedDF() { + return unpartitionedDF().select("name", "dept", "subdept", "id"); + } + + private Dataset compositePartitionedDF() { + return unpartitionedDF().select("name", "subdept", "id", "dept"); + } + + private Dataset compositePartitionedNullRecordDF() { + return singleNullRecordDF().select("name", "subdept", "id", "dept"); + } + + private Dataset weirdColumnNamesDF() { + Dataset unpartitionedDF = unpartitionedDF(); + return unpartitionedDF.select( + unpartitionedDF.col("id"), + unpartitionedDF.col("subdept"), + unpartitionedDF.col("dept"), + unpartitionedDF.col("name").as("naMe")); + } private static final StructField[] dateStruct = { new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), @@ -1027,24 +995,26 @@ private static java.sql.Date toDate(String value) { return new java.sql.Date(DateTime.parse(value).getMillis()); } - private static final Dataset dateDF = - spark - .createDataFrame( - ImmutableList.of( - RowFactory.create(1, "John Doe", toDate("2021-01-01"), "01"), - RowFactory.create(2, "Jane Doe", toDate("2021-01-01"), "01"), - RowFactory.create(3, "Matt Doe", toDate("2021-01-02"), "02"), - RowFactory.create(4, "Will Doe", toDate("2021-01-02"), "02")), - new StructType(dateStruct)) - .repartition(2); + private Dataset dateDF() { + return spark + .createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", toDate("2021-01-01"), "01"), + RowFactory.create(2, "Jane Doe", toDate("2021-01-01"), "01"), + RowFactory.create(3, "Matt Doe", toDate("2021-01-02"), "02"), + RowFactory.create(4, "Will Doe", toDate("2021-01-02"), "02")), + new StructType(dateStruct)) + .repartition(2); + } private void createUnpartitionedFileTable(String format) { String createParquet = "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING %s LOCATION '%s'"; sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - unpartitionedDF.write().insertInto(sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); + Dataset df = unpartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createPartitionedFileTable(String format) { @@ -1054,8 +1024,9 @@ private void createPartitionedFileTable(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - partitionedDF.write().insertInto(sourceTableName); - partitionedDF.write().insertInto(sourceTableName); + Dataset df = partitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createCompositePartitionedTable(String format) { @@ -1064,8 +1035,9 @@ private void createCompositePartitionedTable(String format) { + "PARTITIONED BY (id, dept) LOCATION '%s'"; sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - compositePartitionedDF.write().insertInto(sourceTableName); - compositePartitionedDF.write().insertInto(sourceTableName); + Dataset df = compositePartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createCompositePartitionedTableWithNullValueInPartitionColumn(String format) { @@ -1075,8 +1047,8 @@ private void createCompositePartitionedTableWithNullValueInPartitionColumn(Strin sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); Dataset unionedDF = - compositePartitionedDF - .unionAll(compositePartitionedNullRecordDF) + compositePartitionedDF() + .unionAll(compositePartitionedNullRecordDF()) .select("name", "subdept", "id", "dept") .repartition(1); @@ -1091,8 +1063,9 @@ private void createWeirdCaseTable() { sql(createParquet, sourceTableName); - weirdColumnNamesDF.write().insertInto(sourceTableName); - weirdColumnNamesDF.write().insertInto(sourceTableName); + Dataset df = weirdColumnNamesDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createUnpartitionedHiveTable() { @@ -1101,8 +1074,9 @@ private void createUnpartitionedHiveTable() { sql(createHive, sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); + Dataset df = unpartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createPartitionedHiveTable() { @@ -1112,8 +1086,9 @@ private void createPartitionedHiveTable() { sql(createHive, sourceTableName); - partitionedDF.write().insertInto(sourceTableName); - partitionedDF.write().insertInto(sourceTableName); + Dataset df = partitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createDatePartitionedFileTable(String format) { @@ -1123,7 +1098,7 @@ private void createDatePartitionedFileTable(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - dateDF.select("id", "name", "ts").write().insertInto(sourceTableName); + dateDF().select("id", "name", "ts").write().insertInto(sourceTableName); } private void createTableWithTwoPartitions(String format) { @@ -1133,6 +1108,28 @@ private void createTableWithTwoPartitions(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - dateDF.write().insertInto(sourceTableName); + dateDF().write().insertInto(sourceTableName); + } + + private void createIcebergTable(String schema) { + createIcebergTable(schema, ""); + } + + private void createIcebergTable(String schema, String partitioning) { + sql( + "CREATE TABLE %s (%s) USING iceberg %s TBLPROPERTIES ('%s' '%d')", + tableName, schema, partitioning, TableProperties.FORMAT_VERSION, formatVersion); + } + + private void assertOutput( + List result, long expectedAddedFilesCount, long expectedChangedPartitionCount) { + Object[] output = Iterables.getOnlyElement(result); + assertThat(output[0]).isEqualTo(expectedAddedFilesCount); + if (formatVersion == 1) { + assertThat(output[1]).isEqualTo(expectedChangedPartitionCount); + } else { + // the number of changed partitions may not be populated in v2 tables + assertThat(output[1]).isIn(expectedChangedPartitionCount, null); + } } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..51df02d56959 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -34,6 +34,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -42,6 +43,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; import org.apache.iceberg.exceptions.ValidationException; @@ -596,6 +598,8 @@ public static void importSparkPartitions( .collectAsList(); try { + TableOperations ops = ((HasTableOperations) targetTable).operations(); + int formatVersion = ops.current().formatVersion(); boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean( targetTable.properties(), @@ -606,7 +610,7 @@ public static void importSparkPartitions( manifests.forEach(append::appendManifest); append.commit(); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete original manifests as they were rewritten before the commit deleteManifests(targetTable.io(), manifests); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java index b349694130d3..7ed239149d1c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.connector.catalog.CatalogPlugin; @@ -71,7 +72,7 @@ class AddFilesProcedure extends BaseProcedure { new StructType( new StructField[] { new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()), - new StructField("changed_partition_count", DataTypes.LongType, false, Metadata.empty()), + new StructField("changed_partition_count", DataTypes.LongType, true, Metadata.empty()), }); private AddFilesProcedure(TableCatalog tableCatalog) { @@ -117,12 +118,18 @@ public InternalRow[] call(InternalRow args) { private InternalRow[] toOutputRows(Snapshot snapshot) { Map summary = snapshot.summary(); return new InternalRow[] { - newInternalRow( - Long.parseLong(summary.getOrDefault(SnapshotSummary.ADDED_FILES_PROP, "0")), - Long.parseLong(summary.getOrDefault(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "0"))) + newInternalRow(addedFilesCount(summary), changedPartitionCount(summary)) }; } + private long addedFilesCount(Map stats) { + return PropertyUtil.propertyAsLong(stats, SnapshotSummary.ADDED_FILES_PROP, 0L); + } + + private Long changedPartitionCount(Map stats) { + return PropertyUtil.propertyAsNullableLong(stats, SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + } + private boolean isFileIdentifier(Identifier ident) { String[] namespace = ident.namespace(); return namespace.length == 1 diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 63396e0969d6..3ed99da24947 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.File; import java.io.IOException; import java.util.List; @@ -35,7 +37,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -53,15 +57,42 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runners.Parameterized.Parameters; public class TestAddFilesProcedure extends SparkExtensionsTestBase { + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + 1 + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + 2 + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + 2 + } + }; + } + + private final int formatVersion; private final String sourceTableName = "source_table"; private File fileTableDir; public TestAddFilesProcedure( - String catalogName, String implementation, Map config) { + String catalogName, String implementation, Map config, int formatVersion) { super(catalogName, implementation, config); + this.formatVersion = formatVersion; } @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -85,17 +116,14 @@ public void dropTables() { public void addDataUnpartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -107,10 +135,7 @@ public void addDataUnpartitioned() { public void deleteAndAddBackUnpartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", @@ -124,7 +149,7 @@ public void deleteAndAddBackUnpartitioned() { "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -183,14 +208,13 @@ public void addAvroFile() throws Exception { dataFileWriter.append(record2); dataFileWriter.close(); - String createIceberg = "CREATE TABLE %s (id Long, data String) USING iceberg"; - sql(createIceberg, tableName); + createIcebergTable("id Long, data String"); List result = sql( "CALL %s.system.add_files('%s', '`avro`.`%s`')", catalogName, tableName, outputFile.getPath()); - assertEquals("Procedure output must match", ImmutableList.of(row(1L, 1L)), result); + assertOutput(result, 1L, 1L); List expected = Lists.newArrayList(new Object[] {1L, "a"}, new Object[] {2L, "b"}); @@ -236,15 +260,12 @@ public void addDataUnpartitionedAvro() { public void addDataUnpartitionedHive() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -256,17 +277,14 @@ public void addDataUnpartitionedHive() { public void addDataUnpartitionedExtraCol() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String, foo string) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String, foo string"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -278,16 +296,14 @@ public void addDataUnpartitionedExtraCol() { public void addDataUnpartitionedMissingCol() { createUnpartitionedFileTable("parquet"); - String createIceberg = "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -299,17 +315,14 @@ public void addDataUnpartitionedMissingCol() { public void addDataPartitionedMissingCol() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -321,17 +334,15 @@ public void addDataPartitionedMissingCol() { public void addDataPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -388,15 +399,13 @@ public void addDataPartitionedAvro() { public void addDataPartitionedHive() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(8L, 4L)), result); + assertOutput(result, 8L, 4L); assertEquals( "Iceberg table contains correct data", @@ -408,17 +417,15 @@ public void addDataPartitionedHive() { public void addPartitionToPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -430,10 +437,8 @@ public void addPartitionToPartitioned() { public void deleteAndAddBackPartitioned() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", @@ -447,7 +452,7 @@ public void deleteAndAddBackPartitioned() { "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -459,11 +464,11 @@ public void deleteAndAddBackPartitioned() { public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)" - + "TBLPROPERTIES ('%s'='true')"; - - sql(createIceberg, tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", + tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", @@ -491,17 +496,14 @@ public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { public void addDataPartitionedByDateToPartitioned() { createDatePartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, date Date) USING iceberg PARTITIONED BY (date)"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, date Date", "PARTITIONED BY (date)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -513,10 +515,8 @@ public void addDataPartitionedByDateToPartitioned() { public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { createTableWithTwoPartitions("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, date Date, dept String) USING iceberg PARTITIONED BY (date, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, date Date, dept String", "PARTITIONED BY (date, dept)"); sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", @@ -534,18 +534,15 @@ public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { public void addFilteredPartitionsToPartitioned() { createCompositePartitionedTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -557,18 +554,15 @@ public void addFilteredPartitionsToPartitioned() { public void addFilteredPartitionsToPartitioned2() { createCompositePartitionedTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('dept', 'hr'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(6L, 3L)), result); + assertOutput(result, 6L, 3L); assertEquals( "Iceberg table contains correct data", @@ -582,18 +576,15 @@ public void addFilteredPartitionsToPartitioned2() { public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -605,18 +596,15 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (id, dept)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id, dept)"); List result = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('dept', 'hr'))", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(6L, 3L)), result); + assertOutput(result, 6L, 3L); assertEquals( "Iceberg table contains correct data", @@ -630,18 +618,15 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { public void addWeirdCaseHiveTable() { createWeirdCaseTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, `naMe` String, dept String, subdept String) USING iceberg " - + "PARTITIONED BY (`naMe`)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, `naMe` String, dept String, subdept String", "PARTITIONED BY (`naMe`)"); List result = sql( "CALL %s.system.add_files('%s', '%s', map('naMe', 'John Doe'))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); /* While we would like to use @@ -681,17 +666,15 @@ public void addWeirdCaseHiveTable() { public void addPartitionToPartitionedHive() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result = sql( "CALL %s.system.add_files('%s', '%s', map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result); + assertOutput(result, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -703,10 +686,7 @@ public void addPartitionToPartitionedHive() { public void invalidDataImport() { createPartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); Assertions.assertThatThrownBy( () -> @@ -729,10 +709,8 @@ public void invalidDataImport() { public void invalidDataImportPartitioned() { createUnpartitionedFileTable("parquet"); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); Assertions.assertThatThrownBy( () -> @@ -758,10 +736,8 @@ public void invalidDataImportPartitioned() { public void addTwice() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result1 = sql( @@ -770,7 +746,7 @@ public void addTwice() { + "source_table => '%s', " + "partition_filter => map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -779,7 +755,7 @@ public void addTwice() { + "source_table => '%s', " + "partition_filter => map('id', 2))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -795,10 +771,8 @@ public void addTwice() { public void duplicateDataPartitioned() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); sql( "CALL %s.system.add_files(" @@ -825,10 +799,8 @@ public void duplicateDataPartitioned() { public void duplicateDataPartitionedAllowed() { createPartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List result1 = sql( @@ -838,7 +810,7 @@ public void duplicateDataPartitionedAllowed() { + "partition_filter => map('id', 1))", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -849,7 +821,7 @@ public void duplicateDataPartitionedAllowed() { + "check_duplicate_files => false)", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -864,10 +836,7 @@ public void duplicateDataPartitionedAllowed() { public void duplicateDataUnpartitioned() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); @@ -886,14 +855,11 @@ public void duplicateDataUnpartitioned() { public void duplicateDataUnpartitionedAllowed() { createUnpartitionedHiveTable(); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); List result1 = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result1); + assertOutput(result1, 2L, 1L); List result2 = sql( @@ -902,7 +868,7 @@ public void duplicateDataUnpartitionedAllowed() { + "source_table => '%s'," + "check_duplicate_files => false)", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(2L, 1L)), result2); + assertOutput(result2, 2L, 1L); assertEquals( "Iceberg table contains correct data", @@ -914,17 +880,14 @@ public void duplicateDataUnpartitionedAllowed() { @Test public void testEmptyImportDoesNotThrow() { - - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; - sql(createIceberg, tableName); + createIcebergTable("id Integer, name String, dept String, subdept String"); // Empty path based import List pathResult = sql( "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), pathResult); + assertOutput(pathResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty path", emptyQueryResult, @@ -937,7 +900,7 @@ public void testEmptyImportDoesNotThrow() { List tableResult = sql("CALL %s.system.add_files('%s', '%s')", catalogName, tableName, sourceTableName); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), tableResult); + assertOutput(tableResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty table", emptyQueryResult, @@ -954,10 +917,8 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { "ALTER TABLE %s ADD PARTITION (id = '%d') LOCATION '%d'", sourceTableName, emptyPartitionId, emptyPartitionId); - String createIceberg = - "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; - - sql(createIceberg, tableName); + createIcebergTable( + "id Integer, name String, dept String, subdept String", "PARTITIONED BY (id)"); List tableResult = sql( @@ -967,7 +928,7 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { + "partition_filter => map('id', %d))", catalogName, tableName, sourceTableName, emptyPartitionId); - assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L)), tableResult); + assertOutput(tableResult, 0L, 0L); assertEquals( "Iceberg table contains no added data when importing from an empty table", emptyQueryResult, @@ -983,38 +944,45 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { new StructField("subdept", DataTypes.StringType, true, Metadata.empty()) }; - private static final Dataset unpartitionedDF = - spark - .createDataFrame( - ImmutableList.of( - RowFactory.create(1, "John Doe", "hr", "communications"), - RowFactory.create(2, "Jane Doe", "hr", "salary"), - RowFactory.create(3, "Matt Doe", "hr", "communications"), - RowFactory.create(4, "Will Doe", "facilities", "all")), - new StructType(struct)) - .repartition(1); - - private static final Dataset singleNullRecordDF = - spark - .createDataFrame( - ImmutableList.of(RowFactory.create(null, null, null, null)), new StructType(struct)) - .repartition(1); - - private static final Dataset partitionedDF = - unpartitionedDF.select("name", "dept", "subdept", "id"); - - private static final Dataset compositePartitionedDF = - unpartitionedDF.select("name", "subdept", "id", "dept"); - - private static final Dataset compositePartitionedNullRecordDF = - singleNullRecordDF.select("name", "subdept", "id", "dept"); - - private static final Dataset weirdColumnNamesDF = - unpartitionedDF.select( - unpartitionedDF.col("id"), - unpartitionedDF.col("subdept"), - unpartitionedDF.col("dept"), - unpartitionedDF.col("name").as("naMe")); + private Dataset unpartitionedDF() { + return spark + .createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", "hr", "communications"), + RowFactory.create(2, "Jane Doe", "hr", "salary"), + RowFactory.create(3, "Matt Doe", "hr", "communications"), + RowFactory.create(4, "Will Doe", "facilities", "all")), + new StructType(struct)) + .repartition(1); + } + + private Dataset singleNullRecordDF() { + return spark + .createDataFrame( + ImmutableList.of(RowFactory.create(null, null, null, null)), new StructType(struct)) + .repartition(1); + } + + private Dataset partitionedDF() { + return unpartitionedDF().select("name", "dept", "subdept", "id"); + } + + private Dataset compositePartitionedDF() { + return unpartitionedDF().select("name", "subdept", "id", "dept"); + } + + private Dataset compositePartitionedNullRecordDF() { + return singleNullRecordDF().select("name", "subdept", "id", "dept"); + } + + private Dataset weirdColumnNamesDF() { + Dataset unpartitionedDF = unpartitionedDF(); + return unpartitionedDF.select( + unpartitionedDF.col("id"), + unpartitionedDF.col("subdept"), + unpartitionedDF.col("dept"), + unpartitionedDF.col("name").as("naMe")); + } private static final StructField[] dateStruct = { new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), @@ -1027,24 +995,26 @@ private static java.sql.Date toDate(String value) { return new java.sql.Date(DateTime.parse(value).getMillis()); } - private static final Dataset dateDF = - spark - .createDataFrame( - ImmutableList.of( - RowFactory.create(1, "John Doe", toDate("2021-01-01"), "01"), - RowFactory.create(2, "Jane Doe", toDate("2021-01-01"), "01"), - RowFactory.create(3, "Matt Doe", toDate("2021-01-02"), "02"), - RowFactory.create(4, "Will Doe", toDate("2021-01-02"), "02")), - new StructType(dateStruct)) - .repartition(2); + private Dataset dateDF() { + return spark + .createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", toDate("2021-01-01"), "01"), + RowFactory.create(2, "Jane Doe", toDate("2021-01-01"), "01"), + RowFactory.create(3, "Matt Doe", toDate("2021-01-02"), "02"), + RowFactory.create(4, "Will Doe", toDate("2021-01-02"), "02")), + new StructType(dateStruct)) + .repartition(2); + } private void createUnpartitionedFileTable(String format) { String createParquet = "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING %s LOCATION '%s'"; sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - unpartitionedDF.write().insertInto(sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); + Dataset df = unpartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createPartitionedFileTable(String format) { @@ -1054,8 +1024,9 @@ private void createPartitionedFileTable(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - partitionedDF.write().insertInto(sourceTableName); - partitionedDF.write().insertInto(sourceTableName); + Dataset df = partitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createCompositePartitionedTable(String format) { @@ -1064,8 +1035,9 @@ private void createCompositePartitionedTable(String format) { + "PARTITIONED BY (id, dept) LOCATION '%s'"; sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - compositePartitionedDF.write().insertInto(sourceTableName); - compositePartitionedDF.write().insertInto(sourceTableName); + Dataset df = compositePartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createCompositePartitionedTableWithNullValueInPartitionColumn(String format) { @@ -1075,8 +1047,8 @@ private void createCompositePartitionedTableWithNullValueInPartitionColumn(Strin sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); Dataset unionedDF = - compositePartitionedDF - .unionAll(compositePartitionedNullRecordDF) + compositePartitionedDF() + .unionAll(compositePartitionedNullRecordDF()) .select("name", "subdept", "id", "dept") .repartition(1); @@ -1091,8 +1063,9 @@ private void createWeirdCaseTable() { sql(createParquet, sourceTableName); - weirdColumnNamesDF.write().insertInto(sourceTableName); - weirdColumnNamesDF.write().insertInto(sourceTableName); + Dataset df = weirdColumnNamesDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createUnpartitionedHiveTable() { @@ -1101,8 +1074,9 @@ private void createUnpartitionedHiveTable() { sql(createHive, sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); - unpartitionedDF.write().insertInto(sourceTableName); + Dataset df = unpartitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createPartitionedHiveTable() { @@ -1112,8 +1086,9 @@ private void createPartitionedHiveTable() { sql(createHive, sourceTableName); - partitionedDF.write().insertInto(sourceTableName); - partitionedDF.write().insertInto(sourceTableName); + Dataset df = partitionedDF(); + df.write().insertInto(sourceTableName); + df.write().insertInto(sourceTableName); } private void createDatePartitionedFileTable(String format) { @@ -1123,7 +1098,7 @@ private void createDatePartitionedFileTable(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - dateDF.select("id", "name", "ts").write().insertInto(sourceTableName); + dateDF().select("id", "name", "ts").write().insertInto(sourceTableName); } private void createTableWithTwoPartitions(String format) { @@ -1133,6 +1108,28 @@ private void createTableWithTwoPartitions(String format) { sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); - dateDF.write().insertInto(sourceTableName); + dateDF().write().insertInto(sourceTableName); + } + + private void createIcebergTable(String schema) { + createIcebergTable(schema, ""); + } + + private void createIcebergTable(String schema, String partitioning) { + sql( + "CREATE TABLE %s (%s) USING iceberg %s TBLPROPERTIES ('%s' '%d')", + tableName, schema, partitioning, TableProperties.FORMAT_VERSION, formatVersion); + } + + private void assertOutput( + List result, long expectedAddedFilesCount, long expectedChangedPartitionCount) { + Object[] output = Iterables.getOnlyElement(result); + assertThat(output[0]).isEqualTo(expectedAddedFilesCount); + if (formatVersion == 1) { + assertThat(output[1]).isEqualTo(expectedChangedPartitionCount); + } else { + // the number of changed partitions may not be populated in v2 tables + assertThat(output[1]).isIn(expectedChangedPartitionCount, null); + } } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..51df02d56959 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -34,6 +34,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -42,6 +43,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; import org.apache.iceberg.exceptions.ValidationException; @@ -596,6 +598,8 @@ public static void importSparkPartitions( .collectAsList(); try { + TableOperations ops = ((HasTableOperations) targetTable).operations(); + int formatVersion = ops.current().formatVersion(); boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean( targetTable.properties(), @@ -606,7 +610,7 @@ public static void importSparkPartitions( manifests.forEach(append::appendManifest); append.commit(); - if (!snapshotIdInheritanceEnabled) { + if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { // delete original manifests as they were rewritten before the commit deleteManifests(targetTable.io(), manifests); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java index b349694130d3..7ed239149d1c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.connector.catalog.CatalogPlugin; @@ -71,7 +72,7 @@ class AddFilesProcedure extends BaseProcedure { new StructType( new StructField[] { new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()), - new StructField("changed_partition_count", DataTypes.LongType, false, Metadata.empty()), + new StructField("changed_partition_count", DataTypes.LongType, true, Metadata.empty()), }); private AddFilesProcedure(TableCatalog tableCatalog) { @@ -117,12 +118,18 @@ public InternalRow[] call(InternalRow args) { private InternalRow[] toOutputRows(Snapshot snapshot) { Map summary = snapshot.summary(); return new InternalRow[] { - newInternalRow( - Long.parseLong(summary.getOrDefault(SnapshotSummary.ADDED_FILES_PROP, "0")), - Long.parseLong(summary.getOrDefault(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "0"))) + newInternalRow(addedFilesCount(summary), changedPartitionCount(summary)) }; } + private long addedFilesCount(Map stats) { + return PropertyUtil.propertyAsLong(stats, SnapshotSummary.ADDED_FILES_PROP, 0L); + } + + private Long changedPartitionCount(Map stats) { + return PropertyUtil.propertyAsNullableLong(stats, SnapshotSummary.CHANGED_PARTITION_COUNT_PROP); + } + private boolean isFileIdentifier(Identifier ident) { String[] namespace = ident.namespace(); return namespace.length == 1