Skip to content

Commit

Permalink
[AMORO-3220] Optimize table summary metrics to fetch data (#3221)
Browse files Browse the repository at this point in the history
Optimize table summary metrics to fetch data
  • Loading branch information
MarigWeizhi authored Sep 24, 2024
1 parent c879fdb commit b157261
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.amoro.utils.TablePropertyUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -177,6 +180,13 @@ public PendingInput getPendingInput() {
if (!isInitialized) {
initEvaluator();
}
// Dangling delete files will cause the data scanned by TableScan
// to be inconsistent with the snapshot summary of iceberg
if (TableFormat.ICEBERG == mixedTable.format()) {
Snapshot snapshot = mixedTable.asUnkeyedTable().snapshot(currentSnapshot.snapshotId());
return new PendingInput(partitionPlanMap.values(), snapshot);
}

return new PendingInput(partitionPlanMap.values());
}

Expand All @@ -191,37 +201,73 @@ public static class PendingInput {

@JsonIgnore private final Map<Integer, Set<StructLike>> partitions = Maps.newHashMap();

private int totalFileCount = 0;
private long totalFileSize = 0L;
private long totalFileRecords = 0L;
private int dataFileCount = 0;
private long dataFileSize = 0;
private long dataFileRecords = 0;
private long dataFileSize = 0L;
private long dataFileRecords = 0L;
private int equalityDeleteFileCount = 0;
private int positionalDeleteFileCount = 0;
private long positionalDeleteBytes = 0L;
private long equalityDeleteBytes = 0L;
private long equalityDeleteFileRecords = 0L;
private long positionalDeleteFileRecords = 0L;
private int danglingDeleteFileCount = 0;
private int healthScore = -1; // -1 means not calculated

public PendingInput() {}

public PendingInput(Collection<PartitionEvaluator> evaluators) {
initialize(evaluators);
totalFileCount = dataFileCount + positionalDeleteFileCount + equalityDeleteFileCount;
totalFileSize = dataFileSize + positionalDeleteBytes + equalityDeleteBytes;
totalFileRecords = dataFileRecords + positionalDeleteFileRecords + equalityDeleteFileRecords;
}

public PendingInput(Collection<PartitionEvaluator> evaluators, Snapshot snapshot) {
initialize(evaluators);
Map<String, String> summary = snapshot.summary();
int totalDeleteFiles =
PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_DELETE_FILES_PROP, 0);
int totalDataFiles =
PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
totalFileRecords = PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_RECORDS_PROP, 0);
totalFileSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
totalFileCount = totalDeleteFiles + totalDataFiles;
danglingDeleteFileCount =
totalDeleteFiles - equalityDeleteFileCount - positionalDeleteFileCount;
}

private void initialize(Collection<PartitionEvaluator> evaluators) {
double totalHealthScore = 0;
for (PartitionEvaluator evaluator : evaluators) {
partitions
.computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet())
.add(evaluator.getPartition().second());
dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount();
dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize();
dataFileRecords += evaluator.getFragmentFileRecords() + evaluator.getSegmentFileRecords();
positionalDeleteBytes += evaluator.getPosDeleteFileSize();
positionalDeleteFileRecords += evaluator.getPosDeleteFileRecords();
positionalDeleteFileCount += evaluator.getPosDeleteFileCount();
equalityDeleteBytes += evaluator.getEqualityDeleteFileSize();
equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords();
equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount();
addPartitionData(evaluator);
totalHealthScore += evaluator.getHealthScore();
}
healthScore = (int) Math.ceil(totalHealthScore / evaluators.size());
healthScore = avgHealthScore(totalHealthScore, evaluators.size());
}

private void addPartitionData(PartitionEvaluator evaluator) {
partitions
.computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet())
.add(evaluator.getPartition().second());
dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount();
dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize();
dataFileRecords += evaluator.getFragmentFileRecords() + evaluator.getSegmentFileRecords();
positionalDeleteBytes += evaluator.getPosDeleteFileSize();
positionalDeleteFileRecords += evaluator.getPosDeleteFileRecords();
positionalDeleteFileCount += evaluator.getPosDeleteFileCount();
equalityDeleteBytes += evaluator.getEqualityDeleteFileSize();
equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords();
equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount();
}

private int avgHealthScore(double totalHealthScore, int partitionCount) {
if (partitionCount == 0) {
return 0;
}
return (int) Math.ceil(totalHealthScore / partitionCount);
}

public Map<Integer, Set<StructLike>> getPartitions() {
Expand Down Expand Up @@ -268,9 +314,28 @@ public int getHealthScore() {
return healthScore;
}

public int getTotalFileCount() {
return totalFileCount;
}

public long getTotalFileSize() {
return totalFileSize;
}

public long getTotalFileRecords() {
return totalFileRecords;
}

public int getDanglingDeleteFileCount() {
return danglingDeleteFileCount;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("totalFileCount", totalFileCount)
.add("totalFileSize", totalFileSize)
.add("totalFileRecords", totalFileRecords)
.add("partitions", partitions)
.add("dataFileCount", dataFileCount)
.add("dataFileSize", dataFileSize)
Expand All @@ -282,6 +347,7 @@ public String toString() {
.add("equalityDeleteFileRecords", equalityDeleteFileRecords)
.add("positionalDeleteFileRecords", positionalDeleteFileRecords)
.add("healthScore", healthScore)
.add("danglingDeleteFileCount", danglingDeleteFileCount)
.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public class TableSummaryMetrics {
.withTags("catalog", "database", "table")
.build();

public static final MetricDefine TABLE_SUMMARY_DANGLING_DELETE_FILES =
defineGauge("table_summary_dangling_delete_files")
.withDescription("Number of dangling delete files in the table")
.withTags("catalog", "database", "table")
.build();

// table summary files size metrics
public static final MetricDefine TABLE_SUMMARY_TOTAL_FILES_SIZE =
defineGauge("table_summary_total_files_size")
Expand Down Expand Up @@ -128,22 +134,10 @@ public class TableSummaryMetrics {

private final ServerTableIdentifier identifier;
private final List<MetricKey> registeredMetricKeys = Lists.newArrayList();
private OptimizingEvaluator.PendingInput tableSummary = new OptimizingEvaluator.PendingInput();
private MetricRegistry globalRegistry;

private long totalFiles = 0L;
private long dataFiles = 0L;
private long positionDeleteFiles = 0L;
private long equalityDeleteFiles = 0L;
private long totalFilesSize = 0L;
private long positionDeleteFilesSize = 0L;
private long dataFilesSize = 0L;
private long equalityDeleteFilesSize = 0L;
private long positionDeleteFilesRecords = 0L;
private long totalRecords = 0L;
private long dataFilesRecords = 0L;
private long equalityDeleteFilesRecords = 0L;
private long snapshots = 0L;
private long healthScore = -1L; // -1 means not calculated

public TableSummaryMetrics(ServerTableIdentifier identifier) {
this.identifier = identifier;
Expand All @@ -167,44 +161,72 @@ private void registerMetric(MetricRegistry registry, MetricDefine define, Metric
public void register(MetricRegistry registry) {
if (globalRegistry == null) {
// register files number metrics
registerMetric(registry, TABLE_SUMMARY_TOTAL_FILES, (Gauge<Long>) () -> totalFiles);
registerMetric(registry, TABLE_SUMMARY_DATA_FILES, (Gauge<Long>) () -> dataFiles);
registerMetric(
registry, TABLE_SUMMARY_POSITION_DELETE_FILES, (Gauge<Long>) () -> positionDeleteFiles);
registry,
TABLE_SUMMARY_TOTAL_FILES,
(Gauge<Long>) () -> (long) tableSummary.getTotalFileCount());
registerMetric(
registry,
TABLE_SUMMARY_DATA_FILES,
(Gauge<Long>) () -> (long) tableSummary.getDataFileCount());
registerMetric(
registry,
TABLE_SUMMARY_POSITION_DELETE_FILES,
(Gauge<Long>) () -> (long) tableSummary.getPositionalDeleteFileCount());
registerMetric(
registry,
TABLE_SUMMARY_EQUALITY_DELETE_FILES,
(Gauge<Long>) () -> (long) tableSummary.getEqualityDeleteFileCount());
registerMetric(
registry, TABLE_SUMMARY_EQUALITY_DELETE_FILES, (Gauge<Long>) () -> equalityDeleteFiles);
registry,
TABLE_SUMMARY_DANGLING_DELETE_FILES,
(Gauge<Long>) () -> (long) tableSummary.getDanglingDeleteFileCount());

// register files size metrics
registerMetric(registry, TABLE_SUMMARY_TOTAL_FILES_SIZE, (Gauge<Long>) () -> totalFilesSize);
registerMetric(registry, TABLE_SUMMARY_DATA_FILES_SIZE, (Gauge<Long>) () -> dataFilesSize);
registerMetric(
registry,
TABLE_SUMMARY_TOTAL_FILES_SIZE,
(Gauge<Long>) () -> tableSummary.getTotalFileSize());
registerMetric(
registry,
TABLE_SUMMARY_DATA_FILES_SIZE,
(Gauge<Long>) () -> tableSummary.getDataFileSize());
registerMetric(
registry,
TABLE_SUMMARY_POSITION_DELETE_FILES_SIZE,
(Gauge<Long>) () -> positionDeleteFilesSize);
(Gauge<Long>) () -> tableSummary.getPositionalDeleteBytes());
registerMetric(
registry,
TABLE_SUMMARY_EQUALITY_DELETE_FILES_SIZE,
(Gauge<Long>) () -> equalityDeleteFilesSize);
(Gauge<Long>) () -> tableSummary.getEqualityDeleteBytes());

// register files records metrics
registerMetric(registry, TABLE_SUMMARY_TOTAL_RECORDS, (Gauge<Long>) () -> totalRecords);
registerMetric(
registry, TABLE_SUMMARY_DATA_FILES_RECORDS, (Gauge<Long>) () -> dataFilesRecords);
registry,
TABLE_SUMMARY_TOTAL_RECORDS,
(Gauge<Long>) () -> tableSummary.getTotalFileRecords());
registerMetric(
registry,
TABLE_SUMMARY_DATA_FILES_RECORDS,
(Gauge<Long>) () -> tableSummary.getDataFileRecords());
registerMetric(
registry,
TABLE_SUMMARY_POSITION_DELETE_FILES_RECORDS,
(Gauge<Long>) () -> positionDeleteFilesRecords);
(Gauge<Long>) () -> tableSummary.getPositionalDeleteFileRecords());
registerMetric(
registry,
TABLE_SUMMARY_EQUALITY_DELETE_FILES_RECORDS,
(Gauge<Long>) () -> equalityDeleteFilesRecords);
(Gauge<Long>) () -> tableSummary.getEqualityDeleteFileRecords());

// register health score metric
registerMetric(
registry,
TABLE_SUMMARY_HEALTH_SCORE,
(Gauge<Long>) () -> (long) tableSummary.getHealthScore());

// register snapshots number metric
registerMetric(registry, TABLE_SUMMARY_SNAPSHOTS, (Gauge<Long>) () -> snapshots);

// register health score metric
registerMetric(registry, TABLE_SUMMARY_HEALTH_SCORE, (Gauge<Long>) () -> healthScore);

globalRegistry = registry;
}
}
Expand All @@ -219,31 +241,7 @@ public void refresh(OptimizingEvaluator.PendingInput tableSummary) {
if (tableSummary == null) {
return;
}
totalFiles =
tableSummary.getDataFileCount()
+ tableSummary.getEqualityDeleteFileCount()
+ tableSummary.getPositionalDeleteFileCount();
dataFiles = tableSummary.getDataFileCount();
positionDeleteFiles = tableSummary.getPositionalDeleteFileCount();
equalityDeleteFiles = tableSummary.getEqualityDeleteFileCount();

totalFilesSize =
tableSummary.getDataFileSize()
+ tableSummary.getEqualityDeleteBytes()
+ tableSummary.getPositionalDeleteBytes();
positionDeleteFilesSize = tableSummary.getPositionalDeleteBytes();
dataFilesSize = tableSummary.getDataFileSize();
equalityDeleteFilesSize = tableSummary.getEqualityDeleteBytes();

totalRecords =
tableSummary.getDataFileRecords()
+ tableSummary.getEqualityDeleteFileRecords()
+ tableSummary.getPositionalDeleteFileRecords();
positionDeleteFilesRecords = tableSummary.getPositionalDeleteFileRecords();
dataFilesRecords = tableSummary.getDataFileRecords();
equalityDeleteFilesRecords = tableSummary.getEqualityDeleteFileRecords();

healthScore = tableSummary.getHealthScore();
this.tableSummary = tableSummary;
}

public void refreshSnapshots(MixedTable table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.server.table;

import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DANGLING_DELETE_FILES;
import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DATA_FILES;
import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DATA_FILES_RECORDS;
import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DATA_FILES_SIZE;
Expand Down Expand Up @@ -154,6 +155,8 @@ public void testTableSummaryMetrics() {
Gauge<Long> dataFiles = getMetric(metrics, identifier, TABLE_SUMMARY_DATA_FILES);
Gauge<Long> posDelFiles = getMetric(metrics, identifier, TABLE_SUMMARY_POSITION_DELETE_FILES);
Gauge<Long> eqDelFiles = getMetric(metrics, identifier, TABLE_SUMMARY_EQUALITY_DELETE_FILES);
Gauge<Long> danglingDelFiles =
getMetric(metrics, identifier, TABLE_SUMMARY_DANGLING_DELETE_FILES);

Gauge<Long> totalSize = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES_SIZE);
Gauge<Long> dataSize = getMetric(metrics, identifier, TABLE_SUMMARY_DATA_FILES_SIZE);
Expand All @@ -177,6 +180,7 @@ public void testTableSummaryMetrics() {
Assertions.assertEquals(0, dataFiles.getValue());
Assertions.assertEquals(0, posDelFiles.getValue());
Assertions.assertEquals(0, eqDelFiles.getValue());
Assertions.assertEquals(0, danglingDelFiles.getValue());

Assertions.assertEquals(0, totalSize.getValue());
Assertions.assertEquals(0, dataSize.getValue());
Expand Down
1 change: 1 addition & 0 deletions docs/user-guides/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Amoro has supported built-in metrics to measure status of table self-optimizing
| table_summary_data_files | Gauge | catalog, database, table | Number of data files in the table |
| table_summary_equality_delete_files | Gauge | catalog, database, table | Number of equality delete files in the table |
| table_summary_position_delete_files | Gauge | catalog, database, table | Number of position delete files in the table |
| table_summary_dangling_delete_files | Gauge | catalog, database, table | Number of dangling delete files in the table |
| table_summary_total_files_size | Gauge | catalog, database, table | Total size of files in the table |
| table_summary_data_files_size | Gauge | catalog, database, table | Size of data files in the table |
| table_summary_equality_delete_files_size | Gauge | catalog, database, table | Size of equality delete files in the table |
Expand Down

0 comments on commit b157261

Please sign in to comment.