Skip to content

Commit

Permalink
[ARORO-3289] Avoid calling getMixedTablePartitionSpecById in the scan…
Browse files Browse the repository at this point in the history
… loop (#3290)

Add getSpec to TableFileScanHelper

trigger CI

Add getSpec to TableFileScanHelper
  • Loading branch information
7hong authored Dec 6, 2024
1 parent ccda634 commit 530b700
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.amoro.table.KeyedTableSnapshot;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableSnapshot;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
Expand Down Expand Up @@ -121,9 +120,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
tableFileScanHelper.scan()) {
for (TableFileScanHelper.FileScanResult fileScanResult : results) {
PartitionSpec partitionSpec =
MixedTableUtil.getMixedTablePartitionSpecById(
mixedTable, fileScanResult.file().specId());
PartitionSpec partitionSpec = tableFileScanHelper.getSpec(fileScanResult.file().specId());
StructLike partition = fileScanResult.file().partition();
String partitionPath = partitionSpec.partitionToPath(partition);
PartitionEvaluator evaluator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;

import java.util.Map;

public class IcebergTableFileScanHelper implements TableFileScanHelper {
private final Table table;
private Expression partitionFilter = Expressions.alwaysTrue();
private final long snapshotId;
private final Map<Integer, PartitionSpec> specs;

public IcebergTableFileScanHelper(Table table, long snapshotId) {
this.table = table;
this.snapshotId = snapshotId;
this.specs = table.specs();
}

@Override
Expand All @@ -61,4 +66,9 @@ public TableFileScanHelper withPartitionFilter(Expression partitionFilter) {
this.partitionFilter = partitionFilter;
return this;
}

@Override
public PartitionSpec getSpec(int specId) {
return specs.get(specId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public class KeyedTableFileScanHelper implements TableFileScanHelper {
private final long changeSnapshotId;
private final long baseSnapshotId;
private Expression partitionFilter = Expressions.alwaysTrue();
private final PartitionSpec spec;

public KeyedTableFileScanHelper(KeyedTable keyedTable, KeyedTableSnapshot snapshot) {
this.keyedTable = keyedTable;
this.baseSnapshotId = snapshot.baseSnapshotId();
this.changeSnapshotId = snapshot.changeSnapshotId();
this.spec = keyedTable.spec();
}

/**
Expand Down Expand Up @@ -441,4 +443,13 @@ public void setMinTransactionIdAfter(long minTransactionIdAfter) {
this.minTransactionIdAfter = minTransactionIdAfter;
}
}

@Override
public PartitionSpec getSpec(int specId) {
if (specId != spec.specId()) {
throw new IllegalArgumentException(
"Partition spec id " + specId + " not found in table " + keyedTable.name());
}
return spec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;

Expand Down Expand Up @@ -47,4 +48,6 @@ public List<ContentFile<?>> deleteFiles() {
CloseableIterable<FileScanResult> scan();

TableFileScanHelper withPartitionFilter(Expression partitionFilter);

PartitionSpec getSpec(int specId);
}

0 comments on commit 530b700

Please sign in to comment.