From 2817dd4af905b5c18eb6aaf84a36a08fa012dc55 Mon Sep 17 00:00:00 2001 From: Hongyue/Steve Zhang Date: Mon, 18 Sep 2023 13:14:53 -0700 Subject: [PATCH] Spark 3.4: Push down system functions by V2 filters for rewriting DataFiles and PositionDeleteFiles (#8560) --- .../TestRewriteDataFilesProcedure.java | 78 ++++++++++++++++++- .../procedures/RewriteDataFilesProcedure.java | 20 ++--- .../SparkExpressionConverter.scala | 9 ++- .../actions/TestRewriteDataFilesAction.java | 28 ++++++- 4 files changed, 114 insertions(+), 21 deletions(-) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 801396718137..2449c20ab9a9 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -35,6 +35,7 @@ import org.apache.iceberg.spark.ExtendedParser; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkTableCache; +import org.apache.iceberg.spark.SystemFunctionPushDownHelper; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; @@ -395,6 +396,38 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @Test + public void testRewriteDataFilesWithFilterOnOnBucketExpression() { + // currently spark session catalog only resolve to v1 functions instead of desired v2 functions + // https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083 + Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName())); + createBucketPartitionTable(); + // create 5 files for each partition (c2 = 'foo' and c2 = 'bar') + insertData(10); + List expectedRecords = currentData(); + + // select only 5 files for compaction (files in the partition c2 = 'bar') + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.bucket(2, c2) = 0')", + catalogName, tableIdent, catalogName); + + assertEquals( + "Action should rewrite 5 data files from single matching partition" + + "(containing bucket(c2) = 0) and add 1 data files", + row(5, 1), + row(output.get(0)[0], output.get(0)[1])); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(4); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + @Test public void testRewriteDataFilesWithInFilterOnPartitionTable() { createPartitionTable(); @@ -480,7 +513,6 @@ public void testRewriteDataFilesWithAllPossibleFilters() { sql( "CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2 like \"%s\"')", catalogName, tableIdent, "car%"); - // TODO: Enable when org.apache.iceberg.spark.SparkFilters have implementations for // StringEndsWith & StringContains // StringEndsWith @@ -491,6 +523,39 @@ public void testRewriteDataFilesWithAllPossibleFilters() { // " where => 'c2 like \"%s\"')", catalogName, tableIdent, "%car%"); } + @Test + public void testRewriteDataFilesWithPossibleV2Filters() { + // currently spark session catalog only resolve to v1 functions instead of desired v2 functions + // https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083 + Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName())); + + SystemFunctionPushDownHelper.createPartitionedTable(spark, tableName, "id"); + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.bucket(2, data) >= 0')", + catalogName, tableIdent, catalogName); + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.truncate(4, id) >= 1')", + catalogName, tableIdent, catalogName); + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.years(ts) >= 1')", + catalogName, tableIdent, catalogName); + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.months(ts) >= 1')", + catalogName, tableIdent, catalogName); + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.days(ts) >= date(\"2023-01-01\")')", + catalogName, tableIdent, catalogName); + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " where => '%s.system.hours(ts) >= 1')", + catalogName, tableIdent, catalogName); + } + @Test public void testRewriteDataFilesWithInvalidInputs() { createTable(); @@ -778,6 +843,17 @@ private void createPartitionTable() { TableProperties.WRITE_DISTRIBUTION_MODE_NONE); } + private void createBucketPartitionTable() { + sql( + "CREATE TABLE %s (c1 int, c2 string, c3 string) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c2)) " + + "TBLPROPERTIES ('%s' '%s')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_NONE); + } + private void insertData(int filesCount) { insertData(tableName, filesCount); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java index 07e3f6232c95..14246b47ac4e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java @@ -24,20 +24,17 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.NamedReference; import org.apache.iceberg.expressions.Zorder; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.ExtendedParser; -import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; -import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; -import org.apache.spark.sql.execution.datasources.SparkExpressionConverter; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -103,8 +100,6 @@ public InternalRow[] call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - String quotedFullIdentifier = - Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent); RewriteDataFiles action = actions().rewriteDataFiles(table); String strategy = args.isNullAt(1) ? null : args.getString(1); @@ -120,7 +115,7 @@ public InternalRow[] call(InternalRow args) { String where = args.isNullAt(4) ? null : args.getString(4); - action = checkAndApplyFilter(action, where, quotedFullIdentifier); + action = checkAndApplyFilter(action, where, tableIdent); RewriteDataFiles.Result result = action.execute(); @@ -129,15 +124,10 @@ public InternalRow[] call(InternalRow args) { } private RewriteDataFiles checkAndApplyFilter( - RewriteDataFiles action, String where, String tableName) { + RewriteDataFiles action, String where, Identifier ident) { if (where != null) { - try { - Expression expression = - SparkExpressionConverter.collectResolvedSparkExpression(spark(), tableName, where); - return action.filter(SparkExpressionConverter.convertToIcebergExpression(expression)); - } catch (AnalysisException e) { - throw new IllegalArgumentException("Cannot parse predicates in where option: " + where, e); - } + Expression expression = filterExpression(ident, where); + return action.filter(expression); } return action; } diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala index 9f53eae60aba..4903a100f97f 100644 --- a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala +++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources -import org.apache.iceberg.spark.SparkFilters +import org.apache.iceberg.spark.SparkV2Filters import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Attribute @@ -28,14 +28,15 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy object SparkExpressionConverter { def convertToIcebergExpression(sparkExpression: Expression): org.apache.iceberg.expressions.Expression = { - // Currently, it is a double conversion as we are converting Spark expression to Spark filter - // and then converting Spark filter to Iceberg expression. + // Currently, it is a double conversion as we are converting Spark expression to Spark predicate + // and then converting Spark predicate to Iceberg expression. // But these two conversions already exist and well tested. So, we are going with this approach. - SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get) + SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get) } @throws[AnalysisException] diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 1581e5836478..bfffa65accac 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -223,6 +223,32 @@ public void testBinPackWithFilter() { assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testBinPackWithFilterOnBucketExpression() { + Table table = createTablePartitioned(4, 2); + + shouldHaveFiles(table, 8); + List expectedRecords = currentData(); + long dataSizeBefore = testDataSize(table); + + Result result = + basicRewrite(table) + .filter(Expressions.equal("c1", 1)) + .filter(Expressions.equal(Expressions.bucket("c2", 2), 0)) + .execute(); + + assertThat(result) + .extracting(Result::rewrittenDataFilesCount, Result::addedDataFilesCount) + .as("Action should rewrite 2 data files into 1 data file") + .contains(2, 1); + assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); + + shouldHaveFiles(table, 7); + + List actualRecords = currentData(); + assertEquals("Rows must match", expectedRecords, actualRecords); + } + @Test public void testBinPackAfterPartitionChange() { Table table = createTable(); @@ -260,7 +286,7 @@ public void testBinPackAfterPartitionChange() { } @Test - public void testBinPackWithDeletes() throws Exception { + public void testBinPackWithDeletes() { Table table = createTablePartitioned(4, 2); table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); shouldHaveFiles(table, 8);