Skip to content

Commit

Permalink
Spark 3.4: Push down system functions by V2 filters for rewriting Dat…
Browse files Browse the repository at this point in the history
…aFiles and PositionDeleteFiles (#8560)
  • Loading branch information
dramaticlly authored Sep 18, 2023
1 parent d6bc248 commit 2817dd4
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -395,6 +396,38 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithFilterOnOnBucketExpression() {
// currently spark session catalog only resolve to v1 functions instead of desired v2 functions
// https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));
createBucketPartitionTable();
// create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
insertData(10);
List<Object[]> expectedRecords = currentData();

// select only 5 files for compaction (files in the partition c2 = 'bar')
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.bucket(2, c2) = 0')",
catalogName, tableIdent, catalogName);

assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing bucket(c2) = 0) and add 1 data files",
row(5, 1),
row(output.get(0)[0], output.get(0)[1]));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithInFilterOnPartitionTable() {
createPartitionTable();
Expand Down Expand Up @@ -480,7 +513,6 @@ public void testRewriteDataFilesWithAllPossibleFilters() {
sql(
"CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2 like \"%s\"')",
catalogName, tableIdent, "car%");

// TODO: Enable when org.apache.iceberg.spark.SparkFilters have implementations for
// StringEndsWith & StringContains
// StringEndsWith
Expand All @@ -491,6 +523,39 @@ public void testRewriteDataFilesWithAllPossibleFilters() {
// " where => 'c2 like \"%s\"')", catalogName, tableIdent, "%car%");
}

@Test
public void testRewriteDataFilesWithPossibleV2Filters() {
// currently spark session catalog only resolve to v1 functions instead of desired v2 functions
// https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));

SystemFunctionPushDownHelper.createPartitionedTable(spark, tableName, "id");
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.bucket(2, data) >= 0')",
catalogName, tableIdent, catalogName);
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.truncate(4, id) >= 1')",
catalogName, tableIdent, catalogName);
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.years(ts) >= 1')",
catalogName, tableIdent, catalogName);
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.months(ts) >= 1')",
catalogName, tableIdent, catalogName);
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.days(ts) >= date(\"2023-01-01\")')",
catalogName, tableIdent, catalogName);
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.hours(ts) >= 1')",
catalogName, tableIdent, catalogName);
}

@Test
public void testRewriteDataFilesWithInvalidInputs() {
createTable();
Expand Down Expand Up @@ -778,6 +843,17 @@ private void createPartitionTable() {
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}

private void createBucketPartitionTable() {
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) "
+ "USING iceberg "
+ "PARTITIONED BY (bucket(2, c2)) "
+ "TBLPROPERTIES ('%s' '%s')",
tableName,
TableProperties.WRITE_DISTRIBUTION_MODE,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}

private void insertData(int filesCount) {
insertData(tableName, filesCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,17 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -103,8 +100,6 @@ public InternalRow[] call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
String quotedFullIdentifier =
Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
RewriteDataFiles action = actions().rewriteDataFiles(table);

String strategy = args.isNullAt(1) ? null : args.getString(1);
Expand All @@ -120,7 +115,7 @@ public InternalRow[] call(InternalRow args) {

String where = args.isNullAt(4) ? null : args.getString(4);

action = checkAndApplyFilter(action, where, quotedFullIdentifier);
action = checkAndApplyFilter(action, where, tableIdent);

RewriteDataFiles.Result result = action.execute();

Expand All @@ -129,15 +124,10 @@ public InternalRow[] call(InternalRow args) {
}

private RewriteDataFiles checkAndApplyFilter(
RewriteDataFiles action, String where, String tableName) {
RewriteDataFiles action, String where, Identifier ident) {
if (where != null) {
try {
Expression expression =
SparkExpressionConverter.collectResolvedSparkExpression(spark(), tableName, where);
return action.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
} catch (AnalysisException e) {
throw new IllegalArgumentException("Cannot parse predicates in where option: " + where, e);
}
Expression expression = filterExpression(ident, where);
return action.filter(expression);
}
return action;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.spark.sql.execution.datasources

import org.apache.iceberg.spark.SparkFilters
import org.apache.iceberg.spark.SparkV2Filters
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand All @@ -28,14 +28,15 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy

object SparkExpressionConverter {

def convertToIcebergExpression(sparkExpression: Expression): org.apache.iceberg.expressions.Expression = {
// Currently, it is a double conversion as we are converting Spark expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// Currently, it is a double conversion as we are converting Spark expression to Spark predicate
// and then converting Spark predicate to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
}

@throws[AnalysisException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,32 @@ public void testBinPackWithFilter() {
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testBinPackWithFilterOnBucketExpression() {
Table table = createTablePartitioned(4, 2);

shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
long dataSizeBefore = testDataSize(table);

Result result =
basicRewrite(table)
.filter(Expressions.equal("c1", 1))
.filter(Expressions.equal(Expressions.bucket("c2", 2), 0))
.execute();

assertThat(result)
.extracting(Result::rewrittenDataFilesCount, Result::addedDataFilesCount)
.as("Action should rewrite 2 data files into 1 data file")
.contains(2, 1);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

shouldHaveFiles(table, 7);

List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testBinPackAfterPartitionChange() {
Table table = createTable();
Expand Down Expand Up @@ -260,7 +286,7 @@ public void testBinPackAfterPartitionChange() {
}

@Test
public void testBinPackWithDeletes() throws Exception {
public void testBinPackWithDeletes() {
Table table = createTablePartitioned(4, 2);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
shouldHaveFiles(table, 8);
Expand Down

0 comments on commit 2817dd4

Please sign in to comment.