From b1572613ceddbb396cdd1875cd13ed0322b6bf51 Mon Sep 17 00:00:00 2001 From: Marig_Weizhi Date: Tue, 24 Sep 2024 10:30:23 +0800 Subject: [PATCH] [AMORO-3220] Optimize table summary metrics to fetch data (#3221) Optimize table summary metrics to fetch data --- .../optimizing/plan/OptimizingEvaluator.java | 96 +++++++++++++--- .../server/table/TableSummaryMetrics.java | 104 +++++++++--------- .../server/table/TestTableSummaryMetrics.java | 4 + docs/user-guides/metrics.md | 1 + 4 files changed, 137 insertions(+), 68 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java index 765bdf3bb1..bfd90f35da 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java @@ -37,11 +37,14 @@ import org.apache.amoro.utils.MixedTableUtil; import org.apache.amoro.utils.TablePropertyUtil; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.StructLike; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,6 +180,13 @@ public PendingInput getPendingInput() { if (!isInitialized) { initEvaluator(); } + // Dangling delete files will cause the data scanned by TableScan + // to be inconsistent with the snapshot summary of iceberg + if (TableFormat.ICEBERG == mixedTable.format()) { + Snapshot snapshot = mixedTable.asUnkeyedTable().snapshot(currentSnapshot.snapshotId()); + return new PendingInput(partitionPlanMap.values(), snapshot); + } + return new PendingInput(partitionPlanMap.values()); } @@ -191,37 +201,73 @@ public static class PendingInput { @JsonIgnore private final Map> partitions = Maps.newHashMap(); + private int totalFileCount = 0; + private long totalFileSize = 0L; + private long totalFileRecords = 0L; private int dataFileCount = 0; - private long dataFileSize = 0; - private long dataFileRecords = 0; + private long dataFileSize = 0L; + private long dataFileRecords = 0L; private int equalityDeleteFileCount = 0; private int positionalDeleteFileCount = 0; private long positionalDeleteBytes = 0L; private long equalityDeleteBytes = 0L; private long equalityDeleteFileRecords = 0L; private long positionalDeleteFileRecords = 0L; + private int danglingDeleteFileCount = 0; private int healthScore = -1; // -1 means not calculated public PendingInput() {} public PendingInput(Collection evaluators) { + initialize(evaluators); + totalFileCount = dataFileCount + positionalDeleteFileCount + equalityDeleteFileCount; + totalFileSize = dataFileSize + positionalDeleteBytes + equalityDeleteBytes; + totalFileRecords = dataFileRecords + positionalDeleteFileRecords + equalityDeleteFileRecords; + } + + public PendingInput(Collection evaluators, Snapshot snapshot) { + initialize(evaluators); + Map summary = snapshot.summary(); + int totalDeleteFiles = + PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_DELETE_FILES_PROP, 0); + int totalDataFiles = + PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0); + totalFileRecords = PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_RECORDS_PROP, 0); + totalFileSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0); + totalFileCount = totalDeleteFiles + totalDataFiles; + danglingDeleteFileCount = + totalDeleteFiles - equalityDeleteFileCount - positionalDeleteFileCount; + } + + private void initialize(Collection evaluators) { double totalHealthScore = 0; for (PartitionEvaluator evaluator : evaluators) { - partitions - .computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet()) - .add(evaluator.getPartition().second()); - dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount(); - dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize(); - dataFileRecords += evaluator.getFragmentFileRecords() + evaluator.getSegmentFileRecords(); - positionalDeleteBytes += evaluator.getPosDeleteFileSize(); - positionalDeleteFileRecords += evaluator.getPosDeleteFileRecords(); - positionalDeleteFileCount += evaluator.getPosDeleteFileCount(); - equalityDeleteBytes += evaluator.getEqualityDeleteFileSize(); - equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords(); - equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount(); + addPartitionData(evaluator); totalHealthScore += evaluator.getHealthScore(); } - healthScore = (int) Math.ceil(totalHealthScore / evaluators.size()); + healthScore = avgHealthScore(totalHealthScore, evaluators.size()); + } + + private void addPartitionData(PartitionEvaluator evaluator) { + partitions + .computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet()) + .add(evaluator.getPartition().second()); + dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount(); + dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize(); + dataFileRecords += evaluator.getFragmentFileRecords() + evaluator.getSegmentFileRecords(); + positionalDeleteBytes += evaluator.getPosDeleteFileSize(); + positionalDeleteFileRecords += evaluator.getPosDeleteFileRecords(); + positionalDeleteFileCount += evaluator.getPosDeleteFileCount(); + equalityDeleteBytes += evaluator.getEqualityDeleteFileSize(); + equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords(); + equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount(); + } + + private int avgHealthScore(double totalHealthScore, int partitionCount) { + if (partitionCount == 0) { + return 0; + } + return (int) Math.ceil(totalHealthScore / partitionCount); } public Map> getPartitions() { @@ -268,9 +314,28 @@ public int getHealthScore() { return healthScore; } + public int getTotalFileCount() { + return totalFileCount; + } + + public long getTotalFileSize() { + return totalFileSize; + } + + public long getTotalFileRecords() { + return totalFileRecords; + } + + public int getDanglingDeleteFileCount() { + return danglingDeleteFileCount; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) + .add("totalFileCount", totalFileCount) + .add("totalFileSize", totalFileSize) + .add("totalFileRecords", totalFileRecords) .add("partitions", partitions) .add("dataFileCount", dataFileCount) .add("dataFileSize", dataFileSize) @@ -282,6 +347,7 @@ public String toString() { .add("equalityDeleteFileRecords", equalityDeleteFileRecords) .add("positionalDeleteFileRecords", positionalDeleteFileRecords) .add("healthScore", healthScore) + .add("danglingDeleteFileCount", danglingDeleteFileCount) .toString(); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java index 748f400eee..4ad4f55d17 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableSummaryMetrics.java @@ -62,6 +62,12 @@ public class TableSummaryMetrics { .withTags("catalog", "database", "table") .build(); + public static final MetricDefine TABLE_SUMMARY_DANGLING_DELETE_FILES = + defineGauge("table_summary_dangling_delete_files") + .withDescription("Number of dangling delete files in the table") + .withTags("catalog", "database", "table") + .build(); + // table summary files size metrics public static final MetricDefine TABLE_SUMMARY_TOTAL_FILES_SIZE = defineGauge("table_summary_total_files_size") @@ -128,22 +134,10 @@ public class TableSummaryMetrics { private final ServerTableIdentifier identifier; private final List registeredMetricKeys = Lists.newArrayList(); + private OptimizingEvaluator.PendingInput tableSummary = new OptimizingEvaluator.PendingInput(); private MetricRegistry globalRegistry; - private long totalFiles = 0L; - private long dataFiles = 0L; - private long positionDeleteFiles = 0L; - private long equalityDeleteFiles = 0L; - private long totalFilesSize = 0L; - private long positionDeleteFilesSize = 0L; - private long dataFilesSize = 0L; - private long equalityDeleteFilesSize = 0L; - private long positionDeleteFilesRecords = 0L; - private long totalRecords = 0L; - private long dataFilesRecords = 0L; - private long equalityDeleteFilesRecords = 0L; private long snapshots = 0L; - private long healthScore = -1L; // -1 means not calculated public TableSummaryMetrics(ServerTableIdentifier identifier) { this.identifier = identifier; @@ -167,44 +161,72 @@ private void registerMetric(MetricRegistry registry, MetricDefine define, Metric public void register(MetricRegistry registry) { if (globalRegistry == null) { // register files number metrics - registerMetric(registry, TABLE_SUMMARY_TOTAL_FILES, (Gauge) () -> totalFiles); - registerMetric(registry, TABLE_SUMMARY_DATA_FILES, (Gauge) () -> dataFiles); registerMetric( - registry, TABLE_SUMMARY_POSITION_DELETE_FILES, (Gauge) () -> positionDeleteFiles); + registry, + TABLE_SUMMARY_TOTAL_FILES, + (Gauge) () -> (long) tableSummary.getTotalFileCount()); + registerMetric( + registry, + TABLE_SUMMARY_DATA_FILES, + (Gauge) () -> (long) tableSummary.getDataFileCount()); + registerMetric( + registry, + TABLE_SUMMARY_POSITION_DELETE_FILES, + (Gauge) () -> (long) tableSummary.getPositionalDeleteFileCount()); + registerMetric( + registry, + TABLE_SUMMARY_EQUALITY_DELETE_FILES, + (Gauge) () -> (long) tableSummary.getEqualityDeleteFileCount()); registerMetric( - registry, TABLE_SUMMARY_EQUALITY_DELETE_FILES, (Gauge) () -> equalityDeleteFiles); + registry, + TABLE_SUMMARY_DANGLING_DELETE_FILES, + (Gauge) () -> (long) tableSummary.getDanglingDeleteFileCount()); // register files size metrics - registerMetric(registry, TABLE_SUMMARY_TOTAL_FILES_SIZE, (Gauge) () -> totalFilesSize); - registerMetric(registry, TABLE_SUMMARY_DATA_FILES_SIZE, (Gauge) () -> dataFilesSize); + registerMetric( + registry, + TABLE_SUMMARY_TOTAL_FILES_SIZE, + (Gauge) () -> tableSummary.getTotalFileSize()); + registerMetric( + registry, + TABLE_SUMMARY_DATA_FILES_SIZE, + (Gauge) () -> tableSummary.getDataFileSize()); registerMetric( registry, TABLE_SUMMARY_POSITION_DELETE_FILES_SIZE, - (Gauge) () -> positionDeleteFilesSize); + (Gauge) () -> tableSummary.getPositionalDeleteBytes()); registerMetric( registry, TABLE_SUMMARY_EQUALITY_DELETE_FILES_SIZE, - (Gauge) () -> equalityDeleteFilesSize); + (Gauge) () -> tableSummary.getEqualityDeleteBytes()); // register files records metrics - registerMetric(registry, TABLE_SUMMARY_TOTAL_RECORDS, (Gauge) () -> totalRecords); registerMetric( - registry, TABLE_SUMMARY_DATA_FILES_RECORDS, (Gauge) () -> dataFilesRecords); + registry, + TABLE_SUMMARY_TOTAL_RECORDS, + (Gauge) () -> tableSummary.getTotalFileRecords()); + registerMetric( + registry, + TABLE_SUMMARY_DATA_FILES_RECORDS, + (Gauge) () -> tableSummary.getDataFileRecords()); registerMetric( registry, TABLE_SUMMARY_POSITION_DELETE_FILES_RECORDS, - (Gauge) () -> positionDeleteFilesRecords); + (Gauge) () -> tableSummary.getPositionalDeleteFileRecords()); registerMetric( registry, TABLE_SUMMARY_EQUALITY_DELETE_FILES_RECORDS, - (Gauge) () -> equalityDeleteFilesRecords); + (Gauge) () -> tableSummary.getEqualityDeleteFileRecords()); + + // register health score metric + registerMetric( + registry, + TABLE_SUMMARY_HEALTH_SCORE, + (Gauge) () -> (long) tableSummary.getHealthScore()); // register snapshots number metric registerMetric(registry, TABLE_SUMMARY_SNAPSHOTS, (Gauge) () -> snapshots); - // register health score metric - registerMetric(registry, TABLE_SUMMARY_HEALTH_SCORE, (Gauge) () -> healthScore); - globalRegistry = registry; } } @@ -219,31 +241,7 @@ public void refresh(OptimizingEvaluator.PendingInput tableSummary) { if (tableSummary == null) { return; } - totalFiles = - tableSummary.getDataFileCount() - + tableSummary.getEqualityDeleteFileCount() - + tableSummary.getPositionalDeleteFileCount(); - dataFiles = tableSummary.getDataFileCount(); - positionDeleteFiles = tableSummary.getPositionalDeleteFileCount(); - equalityDeleteFiles = tableSummary.getEqualityDeleteFileCount(); - - totalFilesSize = - tableSummary.getDataFileSize() - + tableSummary.getEqualityDeleteBytes() - + tableSummary.getPositionalDeleteBytes(); - positionDeleteFilesSize = tableSummary.getPositionalDeleteBytes(); - dataFilesSize = tableSummary.getDataFileSize(); - equalityDeleteFilesSize = tableSummary.getEqualityDeleteBytes(); - - totalRecords = - tableSummary.getDataFileRecords() - + tableSummary.getEqualityDeleteFileRecords() - + tableSummary.getPositionalDeleteFileRecords(); - positionDeleteFilesRecords = tableSummary.getPositionalDeleteFileRecords(); - dataFilesRecords = tableSummary.getDataFileRecords(); - equalityDeleteFilesRecords = tableSummary.getEqualityDeleteFileRecords(); - - healthScore = tableSummary.getHealthScore(); + this.tableSummary = tableSummary; } public void refreshSnapshots(MixedTable table) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java index d8d3b9d196..d5d42186c9 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.table; +import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DANGLING_DELETE_FILES; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DATA_FILES; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DATA_FILES_RECORDS; import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_DATA_FILES_SIZE; @@ -154,6 +155,8 @@ public void testTableSummaryMetrics() { Gauge dataFiles = getMetric(metrics, identifier, TABLE_SUMMARY_DATA_FILES); Gauge posDelFiles = getMetric(metrics, identifier, TABLE_SUMMARY_POSITION_DELETE_FILES); Gauge eqDelFiles = getMetric(metrics, identifier, TABLE_SUMMARY_EQUALITY_DELETE_FILES); + Gauge danglingDelFiles = + getMetric(metrics, identifier, TABLE_SUMMARY_DANGLING_DELETE_FILES); Gauge totalSize = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES_SIZE); Gauge dataSize = getMetric(metrics, identifier, TABLE_SUMMARY_DATA_FILES_SIZE); @@ -177,6 +180,7 @@ public void testTableSummaryMetrics() { Assertions.assertEquals(0, dataFiles.getValue()); Assertions.assertEquals(0, posDelFiles.getValue()); Assertions.assertEquals(0, eqDelFiles.getValue()); + Assertions.assertEquals(0, danglingDelFiles.getValue()); Assertions.assertEquals(0, totalSize.getValue()); Assertions.assertEquals(0, dataSize.getValue()); diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md index 435413af70..22855f9472 100644 --- a/docs/user-guides/metrics.md +++ b/docs/user-guides/metrics.md @@ -108,6 +108,7 @@ Amoro has supported built-in metrics to measure status of table self-optimizing | table_summary_data_files | Gauge | catalog, database, table | Number of data files in the table | | table_summary_equality_delete_files | Gauge | catalog, database, table | Number of equality delete files in the table | | table_summary_position_delete_files | Gauge | catalog, database, table | Number of position delete files in the table | +| table_summary_dangling_delete_files | Gauge | catalog, database, table | Number of dangling delete files in the table | | table_summary_total_files_size | Gauge | catalog, database, table | Total size of files in the table | | table_summary_data_files_size | Gauge | catalog, database, table | Size of data files in the table | | table_summary_equality_delete_files_size | Gauge | catalog, database, table | Size of equality delete files in the table |