From bc12dbc10718126703b4905188584e3a2a16e186 Mon Sep 17 00:00:00 2001 From: Darcy Date: Thu, 17 Oct 2024 19:42:28 +0800 Subject: [PATCH 1/4] feature: data-expire by partition info --- .../maintainer/IcebergTableMaintainer.java | 156 ++++++++++++++++-- .../optimizing/maintainer/TestDataExpire.java | 16 +- 2 files changed, 149 insertions(+), 23 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index 86c846e116..2be0a667bf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -45,6 +45,8 @@ import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -63,6 +65,7 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializableFunction; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,8 +78,11 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -696,26 +702,152 @@ CloseableIterable fileScan( protected ExpireFiles expiredFileScan( DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) { - Map partitionFreshness = Maps.newConcurrentMap(); ExpireFiles expiredFiles = new ExpireFiles(); try (CloseableIterable entries = fileScan(table, dataFilter, expirationConfig)) { - Queue fileEntries = new LinkedTransferQueue<>(); - entries.forEach( - e -> { - if (mayExpired(e, partitionFreshness, expireTimestamp)) { - fileEntries.add(e); - } - }); - fileEntries - .parallelStream() - .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness)) - .forEach(expiredFiles::addFile); + boolean expireByPartitionSuccess = false; + if (expirationConfig + .getExpirationLevel() + .equals(DataExpirationConfig.ExpireLevel.PARTITION)) { + expireByPartitionSuccess = + tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles); + } + if (!expireByPartitionSuccess) { + expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, expiredFiles); + } } catch (IOException e) { throw new RuntimeException(e); } return expiredFiles; } + private boolean tryExpireByPartition( + CloseableIterable entries, + DataExpirationConfig expirationConfig, + long expireTimestamp, + ExpireFiles expiredFiles) { + Types.NestedField expirationField = + table.schema().findField(expirationConfig.getExpirationField()); + Map> expirePartitionFieldsMap = + buildExpirePartitionFieldsMap(expirationField); + // All historical specs have expirationField as the partition field. + boolean allSpecsMatch = expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty); + if (allSpecsMatch) { + Comparable expirePartitionValue; + try { + expirePartitionValue = + getPartitionUpperBound(expirationConfig, expirationField, expireTimestamp); + } catch (IllegalArgumentException e) { + LOG.error("Failed to get partition upper bound", e); + return false; + } + + Map>> expirePartitionValueMap = + getExpirePartitionValueMap( + expirePartitionFieldsMap, expirationField, expirePartitionValue); + entries.forEach( + fileEntry -> { + List expiredList = new ArrayList<>(); + ContentFile contentFile = fileEntry.getFile(); + int fileSpecId = contentFile.specId(); + for (Map.Entry> entry : + expirePartitionValueMap.get(fileSpecId).entrySet()) { + Comparable partitionValue = + contentFile.partition().get(entry.getKey(), entry.getValue().getClass()); + boolean expired = partitionValue.compareTo(entry.getValue()) < 0; + expiredList.add(expired); + } + if (!expiredList.isEmpty() && expiredList.stream().allMatch(Boolean::booleanValue)) { + expiredFiles.addFile(fileEntry); + } + }); + return true; + } + return false; + } + + private void expireByMetricsUpperBound( + CloseableIterable entries, + DataExpirationConfig expirationConfig, + long expireTimestamp, + ExpireFiles expiredFiles) { + Map partitionFreshness = Maps.newConcurrentMap(); + Queue fileEntries = new LinkedTransferQueue<>(); + entries.forEach( + e -> { + if (mayExpired(e, partitionFreshness, expireTimestamp)) { + fileEntries.add(e); + } + }); + fileEntries + .parallelStream() + .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness)) + .forEach(expiredFiles::addFile); + } + + private Map> buildExpirePartitionFieldsMap( + Types.NestedField expireField) { + Map> partitionFieldsMap = new HashMap<>(); + for (Map.Entry entry : table.specs().entrySet()) { + int pos = 0; + Map posToField = new HashMap<>(); + for (PartitionField field : entry.getValue().fields()) { + if (field.sourceId() == expireField.fieldId()) { + posToField.put(pos, field); + } + pos++; + } + partitionFieldsMap.put(entry.getKey(), posToField); + } + + return partitionFieldsMap; + } + + private Map>> getExpirePartitionValueMap( + Map> expirePartitionFieldsMap, + Types.NestedField field, + Comparable expireValue) { + Map>> expirePartitionValue = new HashMap<>(); + for (Map.Entry> entry : + expirePartitionFieldsMap.entrySet()) { + Map> posToValue = new HashMap<>(); + for (Map.Entry posToField : entry.getValue().entrySet()) { + posToValue.put( + posToField.getKey(), + ((SerializableFunction, Comparable>) + posToField.getValue().transform().bind(field.type())) + .apply(expireValue)); + } + expirePartitionValue.put(entry.getKey(), posToValue); + } + return expirePartitionValue; + } + + private Comparable getPartitionUpperBound( + DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) { + switch (field.type().typeId()) { + case TIMESTAMP: + return expireTimestamp * 1000; + case LONG: + if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) { + return expireTimestamp; + } else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) { + return expireTimestamp / 1000; + } else { + throw new IllegalArgumentException( + "Number dateformat: " + expirationConfig.getNumberDateFormat()); + } + case STRING: + return LocalDateTime.ofInstant( + Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field)) + .format( + DateTimeFormatter.ofPattern( + expirationConfig.getDateTimePattern(), Locale.getDefault())); + default: + throw new IllegalArgumentException( + "Unsupported expiration field type: " + field.type().typeId()); + } + } + /** * Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level, * we need to collect the oldest files to determine if the partition is obsolete, so we will not diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java index 2d4f6fbb8d..469b037e58 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java @@ -194,17 +194,11 @@ private void testUnKeyedPartitionLevel() { List expected; if (tableTestHelper().partitionSpec().isPartitioned()) { - if (expireByStringDate()) { - expected = - Lists.newArrayList( - createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00")); - } else { - expected = - Lists.newArrayList( - createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"), - createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"), - createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00")); - } + expected = + Lists.newArrayList( + createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"), + createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"), + createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00")); } else { expected = Lists.newArrayList( From 54c3700afc24433a42490a2800778d4e9d19c99d Mon Sep 17 00:00:00 2001 From: Darcy Date: Tue, 3 Dec 2024 20:19:54 +0800 Subject: [PATCH 2/4] add comments --- .../optimizing/maintainer/IcebergTableMaintainer.java | 9 ++++++--- .../server/optimizing/maintainer/TestDataExpire.java | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index 6cfcec1061..97a9d09730 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -705,9 +705,10 @@ protected ExpireFiles expiredFileScan( ExpireFiles expiredFiles = new ExpireFiles(); try (CloseableIterable entries = fileScan(table, dataFilter, expirationConfig)) { boolean expireByPartitionSuccess = false; - if (expirationConfig - .getExpirationLevel() - .equals(DataExpirationConfig.ExpireLevel.PARTITION)) { + if (!table.specs().isEmpty() + && expirationConfig + .getExpirationLevel() + .equals(DataExpirationConfig.ExpireLevel.PARTITION)) { expireByPartitionSuccess = tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles); } @@ -786,6 +787,7 @@ private void expireByMetricsUpperBound( private Map> buildExpirePartitionFieldsMap( Types.NestedField expireField) { + // specId -> (partitionPos -> partitionField) Map> partitionFieldsMap = new HashMap<>(); for (Map.Entry entry : table.specs().entrySet()) { int pos = 0; @@ -825,6 +827,7 @@ private Map>> getExpirePartitionValueMap( private Comparable getPartitionUpperBound( DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) { switch (field.type().typeId()) { + // expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds case TIMESTAMP: return expireTimestamp * 1000; case LONG: diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java index 09ad7169cd..45823eb61d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java @@ -194,6 +194,7 @@ private void testUnKeyedPartitionLevel() { List expected; if (tableTestHelper().partitionSpec().isPartitioned()) { + // retention time is 1 day, expire partitions that order than 2022-01-02 expected = Lists.newArrayList( createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"), From aba77d829c5ad58c02209e596573371680af0687 Mon Sep 17 00:00:00 2001 From: Darcy Date: Fri, 6 Dec 2024 16:00:23 +0800 Subject: [PATCH 3/4] Refactor the code to improve readability. --- .../maintainer/IcebergTableMaintainer.java | 106 ++++++++---------- 1 file changed, 47 insertions(+), 59 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index 97a9d09730..3eccac11cf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -78,11 +78,9 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -728,38 +726,37 @@ private boolean tryExpireByPartition( ExpireFiles expiredFiles) { Types.NestedField expirationField = table.schema().findField(expirationConfig.getExpirationField()); - Map> expirePartitionFieldsMap = - buildExpirePartitionFieldsMap(expirationField); - // All historical specs have expirationField as the partition field. - boolean allSpecsMatch = expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty); - if (allSpecsMatch) { - Comparable expirePartitionValue; - try { - expirePartitionValue = - getPartitionUpperBound(expirationConfig, expirationField, expireTimestamp); - } catch (IllegalArgumentException e) { - LOG.error("Failed to get partition upper bound", e); - return false; - } - Map>> expirePartitionValueMap = - getExpirePartitionValueMap( - expirePartitionFieldsMap, expirationField, expirePartitionValue); + Comparable upperBound; + try { + upperBound = getExpireUpperBound(expirationConfig, expirationField, expireTimestamp); + } catch (IllegalArgumentException e) { + LOG.error("Failed to get partition upper bound", e); + return false; + } + + // all history versions expiration partition upper bound + Map>> allPartitionUpperBound = + getAllPartitionUpperBound(expirationField, upperBound); + + if (allPartitionUpperBound != null) { entries.forEach( fileEntry -> { - List expiredList = new ArrayList<>(); ContentFile contentFile = fileEntry.getFile(); int fileSpecId = contentFile.specId(); - for (Map.Entry> entry : - expirePartitionValueMap.get(fileSpecId).entrySet()) { - Comparable partitionValue = - contentFile.partition().get(entry.getKey(), entry.getValue().getClass()); - boolean expired = partitionValue.compareTo(entry.getValue()) < 0; - expiredList.add(expired); - } - if (!expiredList.isEmpty() && expiredList.stream().allMatch(Boolean::booleanValue)) { - expiredFiles.addFile(fileEntry); + Map> partitionUpperBound = + allPartitionUpperBound.get(fileSpecId); + for (Map.Entry> partitionPosToValue : + partitionUpperBound.entrySet()) { + Integer partitionPos = partitionPosToValue.getKey(); + Comparable partitionUpperBoundValue = partitionPosToValue.getValue(); + Comparable filePartitionValue = + contentFile.partition().get(partitionPos, partitionUpperBoundValue.getClass()); + if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) { + return; + } } + expiredFiles.addFile(fileEntry); }); return true; } @@ -785,46 +782,37 @@ private void expireByMetricsUpperBound( .forEach(expiredFiles::addFile); } - private Map> buildExpirePartitionFieldsMap( - Types.NestedField expireField) { - // specId -> (partitionPos -> partitionField) - Map> partitionFieldsMap = new HashMap<>(); - for (Map.Entry entry : table.specs().entrySet()) { + private Map>> getAllPartitionUpperBound( + Types.NestedField expireField, Comparable upperBound) { + // specId -> (partitionPos -> partitionUpperBoundValue) + Map>> allPartitionUpperBound = new HashMap<>(); + for (Map.Entry spec : table.specs().entrySet()) { int pos = 0; - Map posToField = new HashMap<>(); - for (PartitionField field : entry.getValue().fields()) { + Map> partitionUpperBound = new HashMap<>(); + for (PartitionField field : spec.getValue().fields()) { if (field.sourceId() == expireField.fieldId()) { - posToField.put(pos, field); + if (field.transform().isVoid()) { + return null; + } + Comparable calculatedUpperBound = + ((SerializableFunction, Comparable>) + field.transform().bind(expireField.type())) + .apply(upperBound); + partitionUpperBound.put(pos, calculatedUpperBound); } pos++; } - partitionFieldsMap.put(entry.getKey(), posToField); - } - - return partitionFieldsMap; - } - - private Map>> getExpirePartitionValueMap( - Map> expirePartitionFieldsMap, - Types.NestedField field, - Comparable expireValue) { - Map>> expirePartitionValue = new HashMap<>(); - for (Map.Entry> entry : - expirePartitionFieldsMap.entrySet()) { - Map> posToValue = new HashMap<>(); - for (Map.Entry posToField : entry.getValue().entrySet()) { - posToValue.put( - posToField.getKey(), - ((SerializableFunction, Comparable>) - posToField.getValue().transform().bind(field.type())) - .apply(expireValue)); + // if the partition field is not found in the partition spec, return null + if (partitionUpperBound.isEmpty()) { + return null; } - expirePartitionValue.put(entry.getKey(), posToValue); + allPartitionUpperBound.put(spec.getKey(), partitionUpperBound); } - return expirePartitionValue; + + return allPartitionUpperBound; } - private Comparable getPartitionUpperBound( + private Comparable getExpireUpperBound( DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) { switch (field.type().typeId()) { // expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds From c1ebdd21713fa78ac5000e18ded13ec4c689cb5f Mon Sep 17 00:00:00 2001 From: Darcy Date: Mon, 30 Dec 2024 11:13:03 +0800 Subject: [PATCH 4/4] Refactor the code to improve readability. --- .../maintainer/IcebergTableMaintainer.java | 165 ++++++------------ .../maintainer/MixedTableMaintainer.java | 4 +- .../optimizing/maintainer/TestDataExpire.java | 17 +- 3 files changed, 67 insertions(+), 119 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index 3eccac11cf..7e70fc3976 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -78,9 +78,10 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -654,7 +655,10 @@ private Set deleteInvalidMetadataFile( } CloseableIterable fileScan( - Table table, Expression dataFilter, DataExpirationConfig expirationConfig) { + Table table, + Expression dataFilter, + DataExpirationConfig expirationConfig, + long expireTimestamp) { TableScan tableScan = table.newScan().filter(dataFilter).includeColumnStats(); CloseableIterable tasks; @@ -684,6 +688,7 @@ CloseableIterable fileScan( .collect(Collectors.toSet()); Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); + Comparable expireValue = getExpireValue(expirationConfig, field, expireTimestamp); return CloseableIterable.transform( CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)), contentFile -> { @@ -693,126 +698,36 @@ CloseableIterable fileScan( field, DateTimeFormatter.ofPattern( expirationConfig.getDateTimePattern(), Locale.getDefault()), - expirationConfig.getNumberDateFormat()); + expirationConfig.getNumberDateFormat(), + expireValue); return new FileEntry(contentFile.copyWithoutStats(), literal); }); } protected ExpireFiles expiredFileScan( DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) { + Map partitionFreshness = Maps.newConcurrentMap(); ExpireFiles expiredFiles = new ExpireFiles(); - try (CloseableIterable entries = fileScan(table, dataFilter, expirationConfig)) { - boolean expireByPartitionSuccess = false; - if (!table.specs().isEmpty() - && expirationConfig - .getExpirationLevel() - .equals(DataExpirationConfig.ExpireLevel.PARTITION)) { - expireByPartitionSuccess = - tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles); - } - if (!expireByPartitionSuccess) { - expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, expiredFiles); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return expiredFiles; - } - - private boolean tryExpireByPartition( - CloseableIterable entries, - DataExpirationConfig expirationConfig, - long expireTimestamp, - ExpireFiles expiredFiles) { - Types.NestedField expirationField = - table.schema().findField(expirationConfig.getExpirationField()); - - Comparable upperBound; - try { - upperBound = getExpireUpperBound(expirationConfig, expirationField, expireTimestamp); - } catch (IllegalArgumentException e) { - LOG.error("Failed to get partition upper bound", e); - return false; - } - - // all history versions expiration partition upper bound - Map>> allPartitionUpperBound = - getAllPartitionUpperBound(expirationField, upperBound); - - if (allPartitionUpperBound != null) { + try (CloseableIterable entries = + fileScan(table, dataFilter, expirationConfig, expireTimestamp)) { + Queue fileEntries = new LinkedTransferQueue<>(); entries.forEach( - fileEntry -> { - ContentFile contentFile = fileEntry.getFile(); - int fileSpecId = contentFile.specId(); - Map> partitionUpperBound = - allPartitionUpperBound.get(fileSpecId); - for (Map.Entry> partitionPosToValue : - partitionUpperBound.entrySet()) { - Integer partitionPos = partitionPosToValue.getKey(); - Comparable partitionUpperBoundValue = partitionPosToValue.getValue(); - Comparable filePartitionValue = - contentFile.partition().get(partitionPos, partitionUpperBoundValue.getClass()); - if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) { - return; - } + e -> { + if (mayExpired(e, partitionFreshness, expireTimestamp)) { + fileEntries.add(e); } - expiredFiles.addFile(fileEntry); }); - return true; - } - return false; - } - - private void expireByMetricsUpperBound( - CloseableIterable entries, - DataExpirationConfig expirationConfig, - long expireTimestamp, - ExpireFiles expiredFiles) { - Map partitionFreshness = Maps.newConcurrentMap(); - Queue fileEntries = new LinkedTransferQueue<>(); - entries.forEach( - e -> { - if (mayExpired(e, partitionFreshness, expireTimestamp)) { - fileEntries.add(e); - } - }); - fileEntries - .parallelStream() - .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness)) - .forEach(expiredFiles::addFile); - } - - private Map>> getAllPartitionUpperBound( - Types.NestedField expireField, Comparable upperBound) { - // specId -> (partitionPos -> partitionUpperBoundValue) - Map>> allPartitionUpperBound = new HashMap<>(); - for (Map.Entry spec : table.specs().entrySet()) { - int pos = 0; - Map> partitionUpperBound = new HashMap<>(); - for (PartitionField field : spec.getValue().fields()) { - if (field.sourceId() == expireField.fieldId()) { - if (field.transform().isVoid()) { - return null; - } - Comparable calculatedUpperBound = - ((SerializableFunction, Comparable>) - field.transform().bind(expireField.type())) - .apply(upperBound); - partitionUpperBound.put(pos, calculatedUpperBound); - } - pos++; - } - // if the partition field is not found in the partition spec, return null - if (partitionUpperBound.isEmpty()) { - return null; - } - allPartitionUpperBound.put(spec.getKey(), partitionUpperBound); + fileEntries + .parallelStream() + .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness)) + .forEach(expiredFiles::addFile); + } catch (IOException e) { + throw new RuntimeException(e); } - - return allPartitionUpperBound; + return expiredFiles; } - private Comparable getExpireUpperBound( + private Comparable getExpireValue( DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) { switch (field.type().typeId()) { // expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds @@ -1040,17 +955,20 @@ static boolean willNotRetain( } } - private static Literal getExpireTimestampLiteral( + private Literal getExpireTimestampLiteral( ContentFile contentFile, Types.NestedField field, DateTimeFormatter formatter, - String numberDateFormatter) { + String numberDateFormatter, + Comparable expireValue) { Type type = field.type(); Object upperBound = Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId())); Literal literal = Literal.of(Long.MAX_VALUE); if (null == upperBound) { - return literal; + if (canBeExpireByPartition(contentFile, field, expireValue)) { + literal = Literal.of(0L); + } } else if (upperBound instanceof Long) { switch (type.typeId()) { case TIMESTAMP: @@ -1077,6 +995,29 @@ private static Literal getExpireTimestampLiteral( return literal; } + private boolean canBeExpireByPartition( + ContentFile contentFile, Types.NestedField expireField, Comparable expireValue) { + PartitionSpec partitionSpec = table.specs().get(contentFile.specId()); + int pos = 0; + List compareResults = new ArrayList<>(); + for (PartitionField partitionField : partitionSpec.fields()) { + if (partitionField.sourceId() == expireField.fieldId()) { + if (partitionField.transform().isVoid()) { + return false; + } + Comparable partitionUpperBound = + ((SerializableFunction, Comparable>) + partitionField.transform().bind(expireField.type())) + .apply(expireValue); + Comparable filePartitionValue = + contentFile.partition().get(pos, partitionUpperBound.getClass()); + compareResults.add(filePartitionValue.compareTo(partitionUpperBound) < 0); + } + pos++; + } + return !compareResults.isEmpty() && compareResults.stream().allMatch(Boolean::booleanValue); + } + public Table getTable() { return table; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java index 1b5c8cbd82..c36a870da3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java @@ -195,11 +195,11 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan CloseableIterable changeEntries = CloseableIterable.transform( - changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig), + changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig, expireTimestamp), e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true)); CloseableIterable baseEntries = CloseableIterable.transform( - baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig), + baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig, expireTimestamp), e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false)); IcebergTableMaintainer.ExpireFiles changeExpiredFiles = new IcebergTableMaintainer.ExpireFiles(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java index 45823eb61d..53a977b1b5 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java @@ -195,11 +195,17 @@ private void testUnKeyedPartitionLevel() { List expected; if (tableTestHelper().partitionSpec().isPartitioned()) { // retention time is 1 day, expire partitions that order than 2022-01-02 - expected = - Lists.newArrayList( - createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"), - createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"), - createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00")); + if (expireByStringDate()) { + expected = + Lists.newArrayList( + createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00")); + } else { + expected = + Lists.newArrayList( + createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"), + createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"), + createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00")); + } } else { expected = Lists.newArrayList( @@ -586,6 +592,7 @@ protected static Map getDefaultProp() { prop.put(TableProperties.ENABLE_DATA_EXPIRATION, "true"); prop.put(TableProperties.DATA_EXPIRATION_FIELD, "op_time"); prop.put(TableProperties.DATA_EXPIRATION_RETENTION_TIME, "1d"); + prop.put("write.metadata.metrics.default", "none"); return prop; }