Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: data-expire by partition info #3273

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -63,6 +65,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -75,8 +78,11 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -696,26 +702,155 @@ CloseableIterable<FileEntry> fileScan(

protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
boolean expireByPartitionSuccess = false;
if (!table.specs().isEmpty()
&& expirationConfig
.getExpirationLevel()
.equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
expireByPartitionSuccess =
tryExpireByPartition(entries, expirationConfig, expireTimestamp, expiredFiles);
}
if (!expireByPartitionSuccess) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, expiredFiles);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}

private boolean tryExpireByPartition(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Types.NestedField expirationField =
table.schema().findField(expirationConfig.getExpirationField());
Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
buildExpirePartitionFieldsMap(expirationField);
// All historical specs have expirationField as the partition field.
boolean allSpecsMatch = expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
if (allSpecsMatch) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
Comparable<?> expirePartitionValue;
try {
expirePartitionValue =
getPartitionUpperBound(expirationConfig, expirationField, expireTimestamp);
} catch (IllegalArgumentException e) {
LOG.error("Failed to get partition upper bound", e);
return false;
}

Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
getExpirePartitionValueMap(
expirePartitionFieldsMap, expirationField, expirePartitionValue);
entries.forEach(
fileEntry -> {
List<Boolean> expiredList = new ArrayList<>();
ContentFile<?> contentFile = fileEntry.getFile();
int fileSpecId = contentFile.specId();
for (Map.Entry<Integer, Comparable<?>> entry :
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
expirePartitionValueMap.get(fileSpecId).entrySet()) {
Comparable<Object> partitionValue =
contentFile.partition().get(entry.getKey(), entry.getValue().getClass());
boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
expiredList.add(expired);
}
if (!expiredList.isEmpty() && expiredList.stream().allMatch(Boolean::booleanValue)) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
expiredFiles.addFile(fileEntry);
}
});
return true;
}
return false;
}

private void expireByMetricsUpperBound(
CloseableIterable<FileEntry> entries,
DataExpirationConfig expirationConfig,
long expireTimestamp,
ExpireFiles expiredFiles) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
if (mayExpired(e, partitionFreshness, expireTimestamp)) {
fileEntries.add(e);
}
});
fileEntries
.parallelStream()
.filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
.forEach(expiredFiles::addFile);
}

private Map<Integer, Map<Integer, PartitionField>> buildExpirePartitionFieldsMap(
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
Types.NestedField expireField) {
// specId -> (partitionPos -> partitionField)
Map<Integer, Map<Integer, PartitionField>> partitionFieldsMap = new HashMap<>();
for (Map.Entry<Integer, PartitionSpec> entry : table.specs().entrySet()) {
int pos = 0;
Map<Integer, PartitionField> posToField = new HashMap<>();
for (PartitionField field : entry.getValue().fields()) {
if (field.sourceId() == expireField.fieldId()) {
posToField.put(pos, field);
}
pos++;
}
partitionFieldsMap.put(entry.getKey(), posToField);
}

return partitionFieldsMap;
}

private Map<Integer, Map<Integer, Comparable<?>>> getExpirePartitionValueMap(
Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap,
Types.NestedField field,
Comparable<?> expireValue) {
Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValue = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, PartitionField>> entry :
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
expirePartitionFieldsMap.entrySet()) {
Map<Integer, Comparable<?>> posToValue = new HashMap<>();
for (Map.Entry<Integer, PartitionField> posToField : entry.getValue().entrySet()) {
posToValue.put(
posToField.getKey(),
((SerializableFunction<Comparable<?>, Comparable<?>>)
posToField.getValue().transform().bind(field.type()))
.apply(expireValue));
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
}
expirePartitionValue.put(entry.getKey(), posToValue);
}
return expirePartitionValue;
}

private Comparable<?> getPartitionUpperBound(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
// expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds
case TIMESTAMP:
return expireTimestamp * 1000;
case LONG:
if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) {
return expireTimestamp;
} else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) {
return expireTimestamp / 1000;
} else {
throw new IllegalArgumentException(
"Number dateformat: " + expirationConfig.getNumberDateFormat());
}
case STRING:
return LocalDateTime.ofInstant(
Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field))
.format(
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()));
default:
throw new IllegalArgumentException(
"Unsupported expiration field type: " + field.type().typeId());
}
}

/**
* Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level,
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,12 @@ private void testUnKeyedPartitionLevel() {

List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
if (expireByStringDate()) {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
}
// retention time is 1 day, expire partitions that order than 2022-01-02
expected =
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), "2022-01-03T12:00:00"),
createRecord(3, "333", parseMillis("2022-01-02T12:00:00"), "2022-01-02T12:00:00"),
createRecord(4, "444", parseMillis("2022-01-02T19:00:00"), "2022-01-02T19:00:00"));
} else {
expected =
Lists.newArrayList(
lintingbin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading