Skip to content

Commit

Permalink
Spark 3.5: Support filtering with buckets in RewriteDataFilesProcedure
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Jan 2, 2024
1 parent 604422b commit 0d0ee02
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 1 deletion.
13 changes: 13 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,19 @@ acceptedBreaks:
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method ThisT org.apache.iceberg.Scan<ThisT, T extends org.apache.iceberg.ScanTask,\
\ G extends org.apache.iceberg.ScanTaskGroup<T extends org.apache.iceberg.ScanTask>>::partitionFilter(org.apache.iceberg.expressions.Expression)"
justification: "add partitionFilter interface"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.actions.RewriteDataFiles org.apache.iceberg.actions.RewriteDataFiles::partitionFilter(org.apache.iceberg.expressions.Expression)"
justification: "add partitionFilter interface"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.expressions.Expression org.apache.iceberg.Scan<ThisT,\
\ T extends org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T\
\ extends org.apache.iceberg.ScanTask>>::partitionFilter()"
justification: "add partitionFilter interface"
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.mapping.NameMapping"
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ public Expression filter() {
return scan.filter();
}

@Override
public BatchScan partitionFilter(Expression expr) {
return new BatchScanAdapter(scan.partitionFilter(expr));
}

@Override
public Expression partitionFilter() {
return scan.partitionFilter();
}

@Override
public BatchScan ignoreResiduals() {
return new BatchScanAdapter(scan.ignoreResiduals());
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ default ThisT select(String... columns) {
*/
Expression filter();

ThisT partitionFilter(Expression expr);

Expression partitionFilter();

/**
* Create a new scan from this that applies data filtering to files but not to rows in those
* files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ default RewriteDataFiles zOrder(String... columns) {
*/
RewriteDataFiles filter(Expression expression);

default RewriteDataFiles partitionFilter(Expression expression) {
throw new UnsupportedOperationException(
"Partition Filter not implemented for this framework");
}

/**
* A map of file group information to the results of rewriting that file group. If the results are
* null then that particular file group failed. We should only have failed groups if partial
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ public Expression filter() {
return context().rowFilter();
}

@Override
public ThisT partitionFilter(Expression expr) {
return newRefinedScan(
table, schema, context.filterPartitions(Expressions.and(context.partitionFilter(), expr)));
}

@Override
public Expression partitionFilter() {
return context().partitionFilter();
}

@Override
public ThisT ignoreResiduals() {
return newRefinedScan(table, schema, context.ignoreResiduals(true));
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
.filterPartitions(partitionFilter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public Expression rowFilter() {
return Expressions.alwaysTrue();
}

@Value.Default
public Expression partitionFilter() {
return Expressions.alwaysTrue();
}

@Value.Default
public boolean ignoreResiduals() {
return false;
Expand Down Expand Up @@ -111,6 +116,10 @@ TableScanContext filterRows(Expression filter) {
return ImmutableTableScanContext.builder().from(this).rowFilter(filter).build();
}

TableScanContext filterPartitions(Expression filter) {
return ImmutableTableScanContext.builder().from(this).partitionFilter(filter).build();
}

TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
return ImmutableTableScanContext.builder()
.from(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,35 @@ public void testRewriteDataFilesWithInFilterOnPartitionTable() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithPartitionFilterOnBucketTable() {
createBucketTable();
// create 5 files for each partition (c2_bucket = 0 and c2_bucket = 1)
insertData(10);
List<Object[]> expectedRecords = currentData();

// select only 5 files for compaction (files in the partition c2 in ('bar'))
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " buckets => map('c2_bucket', '1'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2_bucket = 1) and add 1 data files",
row(5, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithAllPossibleFilters() {
createPartitionTable();
Expand Down Expand Up @@ -874,6 +903,17 @@ private void createBucketPartitionTable() {
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}

private void createBucketTable() {
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) "
+ "USING iceberg "
+ "PARTITIONED BY (bucket(2, c2)) "
+ "TBLPROPERTIES ('%s' '%s')",
tableName,
TableProperties.WRITE_DISTRIBUTION_MODE,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}

private void insertData(int filesCount) {
insertData(tableName, filesCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class RewriteDataFilesSparkAction
private final Table table;

private Expression filter = Expressions.alwaysTrue();
private Expression partitionFilter = Expressions.alwaysTrue();
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
Expand Down Expand Up @@ -146,6 +147,12 @@ public RewriteDataFilesSparkAction filter(Expression expression) {
return this;
}

@Override
public RewriteDataFilesSparkAction partitionFilter(Expression expression) {
partitionFilter = Expressions.and(partitionFilter, expression);
return this;
}

@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
Expand Down Expand Up @@ -185,6 +192,7 @@ StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId)
.newScan()
.useSnapshot(startingSnapshotId)
.filter(filter)
.partitionFilter(partitionFilter)
.ignoreResiduals()
.planFiles();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -54,7 +55,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("strategy", DataTypes.StringType),
ProcedureParameter.optional("sort_order", DataTypes.StringType),
ProcedureParameter.optional("options", STRING_MAP),
ProcedureParameter.optional("where", DataTypes.StringType)
ProcedureParameter.optional("where", DataTypes.StringType),
ProcedureParameter.optional("buckets", STRING_MAP)
};

// counts are not nullable since the action result is never null
Expand Down Expand Up @@ -117,6 +119,10 @@ public InternalRow[] call(InternalRow args) {

action = checkAndApplyFilter(action, where, tableIdent);

if (!args.isNullAt(5)) {
action = checkAndApplyBuckets(args, action);
}

RewriteDataFiles.Result result = action.execute();

return toOutputRows(result);
Expand All @@ -132,6 +138,25 @@ private RewriteDataFiles checkAndApplyFilter(
return action;
}

private RewriteDataFiles checkAndApplyBuckets(InternalRow args, RewriteDataFiles action) {
Map<String, String> buckets = Maps.newHashMap();
args.getMap(5)
.foreach(
DataTypes.StringType,
DataTypes.StringType,
(k, v) -> {
buckets.put(k.toString(), v.toString());
return BoxedUnit.UNIT;
});
org.apache.iceberg.expressions.Expression expression = Expressions.alwaysTrue();
for (Map.Entry<String, String> bucket : buckets.entrySet()) {
expression =
Expressions.and(
expression, Expressions.equal(bucket.getKey(), Integer.parseInt(bucket.getValue())));
}
return action.partitionFilter(expression);
}

private RewriteDataFiles checkAndApplyOptions(InternalRow args, RewriteDataFiles action) {
Map<String, String> options = Maps.newHashMap();
args.getMap(3)
Expand Down

0 comments on commit 0d0ee02

Please sign in to comment.