From c3db7ea30cfad2321750162df5d593f5ad58e65e Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 2 Jan 2024 10:12:27 +0800 Subject: [PATCH] Spark 3.5: Support filtering with buckets in RewriteDataFilesProcedure --- .palantir/revapi.yml | 13 ++++++ .../org/apache/iceberg/BatchScanAdapter.java | 10 +++++ .../main/java/org/apache/iceberg/Scan.java | 4 ++ .../iceberg/actions/RewriteDataFiles.java | 4 ++ .../java/org/apache/iceberg/BaseScan.java | 11 +++++ .../org/apache/iceberg/DataTableScan.java | 1 + .../org/apache/iceberg/TableScanContext.java | 9 +++++ .../TestRewriteDataFilesProcedure.java | 40 +++++++++++++++++++ .../actions/RewriteDataFilesSparkAction.java | 8 ++++ .../procedures/RewriteDataFilesProcedure.java | 27 ++++++++++++- 10 files changed, 126 insertions(+), 1 deletion(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 990c9ba31afd..eefea2b0a572 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -873,6 +873,19 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.Ciphers::()" justification: "Static utility class - should not have public constructor" "1.4.0": + org.apache.iceberg:iceberg-api: + - code: "java.method.addedToInterface" + new: "method ThisT org.apache.iceberg.Scan>::partitionFilter(org.apache.iceberg.expressions.Expression)" + justification: "add partitionFilter interface" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.actions.RewriteDataFiles org.apache.iceberg.actions.RewriteDataFiles::partitionFilter(org.apache.iceberg.expressions.Expression)" + justification: "add partitionFilter interface" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.expressions.Expression org.apache.iceberg.Scan>::partitionFilter()" + justification: "add partitionFilter interface" org.apache.iceberg:iceberg-core: - code: "java.class.defaultSerializationChanged" old: "class org.apache.iceberg.mapping.NameMapping" diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java index 02b3d241d893..03bf674e20df 100644 --- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java +++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java @@ -103,6 +103,16 @@ public Expression filter() { return scan.filter(); } + @Override + public BatchScan partitionFilter(Expression expr) { + return new BatchScanAdapter(scan.partitionFilter(expr)); + } + + @Override + public Expression partitionFilter() { + return scan.partitionFilter(); + } + @Override public BatchScan ignoreResiduals() { return new BatchScanAdapter(scan.ignoreResiduals()); diff --git a/api/src/main/java/org/apache/iceberg/Scan.java b/api/src/main/java/org/apache/iceberg/Scan.java index 339bc75336ba..2bd2d61e5eef 100644 --- a/api/src/main/java/org/apache/iceberg/Scan.java +++ b/api/src/main/java/org/apache/iceberg/Scan.java @@ -127,6 +127,10 @@ default ThisT select(String... columns) { */ Expression filter(); + ThisT partitionFilter(Expression expr); + + Expression partitionFilter(); + /** * Create a new scan from this that applies data filtering to files but not to rows in those * files. diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index 854b0993513e..9540a7d50a5d 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -171,6 +171,10 @@ default RewriteDataFiles zOrder(String... columns) { */ RewriteDataFiles filter(Expression expression); + default RewriteDataFiles partitionFilter(Expression expression) { + throw new UnsupportedOperationException("Partition Filter not implemented for this framework"); + } + /** * A map of file group information to the results of rewriting that file group. If the results are * null then that particular file group failed. We should only have failed groups if partial diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 8c309cf69e6d..46d1c1808ba9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -200,6 +200,17 @@ public Expression filter() { return context().rowFilter(); } + @Override + public ThisT partitionFilter(Expression expr) { + return newRefinedScan( + table, schema, context.filterPartitions(Expressions.and(context.partitionFilter(), expr))); + } + + @Override + public Expression partitionFilter() { + return context().partitionFilter(); + } + @Override public ThisT ignoreResiduals() { return newRefinedScan(table, schema, context.ignoreResiduals(true)); diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 8463112b7a51..d71dc5392852 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -74,6 +74,7 @@ public CloseableIterable doPlanFiles() { .caseSensitive(isCaseSensitive()) .select(scanColumns()) .filterData(filter()) + .filterPartitions(partitionFilter()) .specsById(table().specs()) .scanMetrics(scanMetrics()) .ignoreDeleted() diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index 5722ed7d8c1c..8dc697dc48b8 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -45,6 +45,11 @@ public Expression rowFilter() { return Expressions.alwaysTrue(); } + @Value.Default + public Expression partitionFilter() { + return Expressions.alwaysTrue(); + } + @Value.Default public boolean ignoreResiduals() { return false; @@ -111,6 +116,10 @@ TableScanContext filterRows(Expression filter) { return ImmutableTableScanContext.builder().from(this).rowFilter(filter).build(); } + TableScanContext filterPartitions(Expression filter) { + return ImmutableTableScanContext.builder().from(this).partitionFilter(filter).build(); + } + TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) { return ImmutableTableScanContext.builder() .from(this) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 25e506a85a7f..6388aa645f04 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -456,6 +456,35 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @Test + public void testRewriteDataFilesWithPartitionFilterOnBucketTable() { + createBucketTable(); + // create 5 files for each partition (c2_bucket = 0 and c2_bucket = 1) + insertData(10); + List expectedRecords = currentData(); + + // select only 5 files for compaction (files in the partition c2 in ('bar')) + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s'," + + " buckets => map('c2_bucket', '1'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 5 data files from single matching partition" + + "(containing c2_bucket = 1) and add 1 data files", + row(5, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(4); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + @Test public void testRewriteDataFilesWithAllPossibleFilters() { createPartitionTable(); @@ -874,6 +903,17 @@ private void createBucketPartitionTable() { TableProperties.WRITE_DISTRIBUTION_MODE_NONE); } + private void createBucketTable() { + sql( + "CREATE TABLE %s (c1 int, c2 string, c3 string) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c2)) " + + "TBLPROPERTIES ('%s' '%s')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_NONE); + } + private void insertData(int filesCount) { insertData(tableName, filesCount); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 6b5628a1f4b5..6d60ee6a50cb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -89,6 +89,7 @@ public class RewriteDataFilesSparkAction private final Table table; private Expression filter = Expressions.alwaysTrue(); + private Expression partitionFilter = Expressions.alwaysTrue(); private int maxConcurrentFileGroupRewrites; private int maxCommits; private boolean partialProgressEnabled; @@ -146,6 +147,12 @@ public RewriteDataFilesSparkAction filter(Expression expression) { return this; } + @Override + public RewriteDataFilesSparkAction partitionFilter(Expression expression) { + partitionFilter = Expressions.and(partitionFilter, expression); + return this; + } + @Override public RewriteDataFiles.Result execute() { if (table.currentSnapshot() == null) { @@ -185,6 +192,7 @@ StructLikeMap>> planFileGroups(long startingSnapshotId) .newScan() .useSnapshot(startingSnapshotId) .filter(filter) + .partitionFilter(partitionFilter) .ignoreResiduals() .planFiles(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java index 14246b47ac4e..38aca5e04a51 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java @@ -25,6 +25,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.NamedReference; import org.apache.iceberg.expressions.Zorder; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -54,7 +55,8 @@ class RewriteDataFilesProcedure extends BaseProcedure { ProcedureParameter.optional("strategy", DataTypes.StringType), ProcedureParameter.optional("sort_order", DataTypes.StringType), ProcedureParameter.optional("options", STRING_MAP), - ProcedureParameter.optional("where", DataTypes.StringType) + ProcedureParameter.optional("where", DataTypes.StringType), + ProcedureParameter.optional("buckets", STRING_MAP) }; // counts are not nullable since the action result is never null @@ -117,6 +119,10 @@ public InternalRow[] call(InternalRow args) { action = checkAndApplyFilter(action, where, tableIdent); + if (!args.isNullAt(5)) { + action = checkAndApplyBuckets(args, action); + } + RewriteDataFiles.Result result = action.execute(); return toOutputRows(result); @@ -132,6 +138,25 @@ private RewriteDataFiles checkAndApplyFilter( return action; } + private RewriteDataFiles checkAndApplyBuckets(InternalRow args, RewriteDataFiles action) { + Map buckets = Maps.newHashMap(); + args.getMap(5) + .foreach( + DataTypes.StringType, + DataTypes.StringType, + (k, v) -> { + buckets.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + org.apache.iceberg.expressions.Expression expression = Expressions.alwaysTrue(); + for (Map.Entry bucket : buckets.entrySet()) { + expression = + Expressions.and( + expression, Expressions.equal(bucket.getKey(), Integer.parseInt(bucket.getValue()))); + } + return action.partitionFilter(expression); + } + private RewriteDataFiles checkAndApplyOptions(InternalRow args, RewriteDataFiles action) { Map options = Maps.newHashMap(); args.getMap(3)