Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: data-expire by partition info #3273

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -63,6 +65,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -75,8 +78,10 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -650,7 +655,10 @@ private Set<String> deleteInvalidMetadataFile(
}

CloseableIterable<FileEntry> fileScan(
Table table, Expression dataFilter, DataExpirationConfig expirationConfig) {
Table table,
Expression dataFilter,
DataExpirationConfig expirationConfig,
long expireTimestamp) {
TableScan tableScan = table.newScan().filter(dataFilter).includeColumnStats();

CloseableIterable<FileScanTask> tasks;
Expand Down Expand Up @@ -680,6 +688,7 @@ CloseableIterable<FileEntry> fileScan(
.collect(Collectors.toSet());

Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
Comparable<?> expireValue = getExpireValue(expirationConfig, field, expireTimestamp);
return CloseableIterable.transform(
CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)),
contentFile -> {
Expand All @@ -689,7 +698,8 @@ CloseableIterable<FileEntry> fileScan(
field,
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()),
expirationConfig.getNumberDateFormat());
expirationConfig.getNumberDateFormat(),
expireValue);
return new FileEntry(contentFile.copyWithoutStats(), literal);
});
}
Expand All @@ -698,7 +708,8 @@ protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {
try (CloseableIterable<FileEntry> entries =
fileScan(table, dataFilter, expirationConfig, expireTimestamp)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
Expand All @@ -716,6 +727,33 @@ protected ExpireFiles expiredFileScan(
return expiredFiles;
}

private Comparable<?> getExpireValue(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
// expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds
case TIMESTAMP:
return expireTimestamp * 1000;
case LONG:
if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) {
return expireTimestamp;
} else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) {
return expireTimestamp / 1000;
} else {
throw new IllegalArgumentException(
"Number dateformat: " + expirationConfig.getNumberDateFormat());
}
case STRING:
return LocalDateTime.ofInstant(
Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field))
.format(
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()));
default:
throw new IllegalArgumentException(
"Unsupported expiration field type: " + field.type().typeId());
}
}

/**
* Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level,
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
Expand Down Expand Up @@ -917,17 +955,20 @@ static boolean willNotRetain(
}
}

private static Literal<Long> getExpireTimestampLiteral(
private Literal<Long> getExpireTimestampLiteral(
ContentFile<?> contentFile,
Types.NestedField field,
DateTimeFormatter formatter,
String numberDateFormatter) {
String numberDateFormatter,
Comparable<?> expireValue) {
Type type = field.type();
Object upperBound =
Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId()));
Literal<Long> literal = Literal.of(Long.MAX_VALUE);
if (null == upperBound) {
return literal;
if (canBeExpireByPartition(contentFile, field, expireValue)) {
literal = Literal.of(0L);
}
} else if (upperBound instanceof Long) {
switch (type.typeId()) {
case TIMESTAMP:
Expand All @@ -954,6 +995,29 @@ private static Literal<Long> getExpireTimestampLiteral(
return literal;
}

private boolean canBeExpireByPartition(
ContentFile<?> contentFile, Types.NestedField expireField, Comparable<?> expireValue) {
PartitionSpec partitionSpec = table.specs().get(contentFile.specId());
int pos = 0;
List<Boolean> compareResults = new ArrayList<>();
for (PartitionField partitionField : partitionSpec.fields()) {
if (partitionField.sourceId() == expireField.fieldId()) {
if (partitionField.transform().isVoid()) {
return false;
}
Comparable<?> partitionUpperBound =
((SerializableFunction<Comparable<?>, Comparable<?>>)
partitionField.transform().bind(expireField.type()))
.apply(expireValue);
Comparable<Object> filePartitionValue =
contentFile.partition().get(pos, partitionUpperBound.getClass());
compareResults.add(filePartitionValue.compareTo(partitionUpperBound) < 0);
}
pos++;
}
return !compareResults.isEmpty() && compareResults.stream().allMatch(Boolean::booleanValue);
}

public Table getTable() {
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan

CloseableIterable<MixedFileEntry> changeEntries =
CloseableIterable.transform(
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig),
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig, expireTimestamp),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true));
CloseableIterable<MixedFileEntry> baseEntries =
CloseableIterable.transform(
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig),
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig, expireTimestamp),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false));
IcebergTableMaintainer.ExpireFiles changeExpiredFiles =
new IcebergTableMaintainer.ExpireFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ private void testUnKeyedPartitionLevel() {

List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
// retention time is 1 day, expire partitions that order than 2022-01-02
if (expireByStringDate()) {
expected =
Lists.newArrayList(
Expand Down Expand Up @@ -591,6 +592,7 @@ protected static Map<String, String> getDefaultProp() {
prop.put(TableProperties.ENABLE_DATA_EXPIRATION, "true");
prop.put(TableProperties.DATA_EXPIRATION_FIELD, "op_time");
prop.put(TableProperties.DATA_EXPIRATION_RETENTION_TIME, "1d");
prop.put("write.metadata.metrics.default", "none");
return prop;
}

Expand Down
Loading