Skip to content

Commit

Permalink
Steven's second round of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 18, 2024
1 parent 5a1516c commit ea3366e
Show file tree
Hide file tree
Showing 18 changed files with 537 additions and 779 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.stream;
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor;
import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
import org.apache.iceberg.flink.maintenance.operator.TaskResult;
import org.apache.iceberg.flink.maintenance.operator.Trigger;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Deletes expired snapshots and the corresponding files. */
public class ExpireSnapshots {
private static final long DELETE_INITIAL_DELAY_MS = 10L;
private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
private static final long DELETE_TIMEOUT_MS = 10000L;
private static final int DELETE_ATTEMPT_NUM = 10;
private static final String EXECUTOR_TASK_NAME = "ES Executor";
@VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file";
private static final int DELETE_BATCH_SIZE_DEFAULT = 10;
private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot";
@VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file";

private ExpireSnapshots() {
// Do not instantiate directly
Expand All @@ -53,64 +48,59 @@ public static Builder builder() {

public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
private Duration maxSnapshotAge = null;
private Integer retainLast = null;
private Integer numSnapshots = null;
private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT;
private int deleteParallelism = 1;

/**
* The snapshots newer than this age will not be removed.
* The snapshots older than this age will be removed.
*
* @param newMaxSnapshotAge of the snapshots to be removed
* @return for chained calls
*/
public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
this.maxSnapshotAge = newMaxSnapshotAge;
return this;
}

/**
* The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more details description see
* {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
* The minimum number of {@link Snapshot}s to retain. For more details description see {@link
* org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
*
* @param newRetainLast number of snapshots to retain
* @return for chained calls
* @param newNumSnapshots number of snapshots to retain
*/
public Builder retainLast(int newRetainLast) {
this.retainLast = newRetainLast;
public Builder retainLast(int newNumSnapshots) {
this.numSnapshots = newNumSnapshots;
return this;
}

/**
* The worker pool size used to calculate the files to delete.
*
* @param newPlanningWorkerPoolSize for planning files to delete
* @return for chained calls
*/
public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {
this.planningWorkerPoolSize = newPlanningWorkerPoolSize;
return this;
}

/**
* The number of retries on the failed delete attempts.
* Size of the batch used to deleting the files.
*
* @param newDeleteAttemptNum number of retries
* @return for chained calls
* @param newDeleteBatchSize used for deleting
*/
public Builder deleteAttemptNum(int newDeleteAttemptNum) {
this.deleteAttemptNum = newDeleteAttemptNum;
public Builder deleteBatchSize(int newDeleteBatchSize) {
this.deleteBatchSize = newDeleteBatchSize;
return this;
}

/**
* The worker pool size used for deleting files.
* The number of subtasks which are doing the deletes.
*
* @param newDeleteWorkerPoolSize for scanning
* @return for chained calls
* @param newDeleteParallelism used for deleting
*/
public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) {
this.deleteWorkerPoolSize = newDeleteWorkerPoolSize;
public Builder deleteParallelism(int newDeleteParallelism) {
this.deleteParallelism = newDeleteParallelism;
return this;
}

Expand All @@ -124,36 +114,26 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
new ExpireSnapshotsProcessor(
tableLoader(),
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
retainLast,
numSnapshots,
planningWorkerPoolSize))
.name(EXECUTOR_TASK_NAME)
.uid("expire-snapshots-" + uidSuffix())
.name(EXECUTOR_OPERATOR_NAME)
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();

AsyncRetryStrategy<Boolean> retryStrategy =
new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<Boolean>(
deleteAttemptNum,
DELETE_INITIAL_DELAY_MS,
DELETE_MAX_RETRY_DELAY_MS,
DELETE_BACKOFF_MULTIPLIER)
.ifResult(AsyncDeleteFiles.FAILED_PREDICATE)
.build();

AsyncDataStream.unorderedWaitWithRetry(
result.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM).rebalance(),
new AsyncDeleteFiles(name(), tableLoader(), deleteWorkerPoolSize),
DELETE_TIMEOUT_MS,
TimeUnit.MILLISECONDS,
deleteWorkerPoolSize,
retryStrategy)
.name(DELETE_FILES_TASK_NAME)
.uid("delete-expired-files-" + uidSuffix())
result
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
.rebalance()
.transform(
DELETE_FILES_OPERATOR_NAME,
TypeInformation.of(Void.class),
new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize))
.name(DELETE_FILES_OPERATOR_NAME)
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
.setParallelism(deleteParallelism);

// Deleting the files is asynchronous, so we ignore the results when calculating the return
// value
// Ignore the file deletion result and return the DataStream<TaskResult> directly
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.stream;
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.flink.TableLoader;
Expand All @@ -28,7 +29,8 @@
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> {
@PublicEvolving
abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> {
private int index;
private String name;
private TableLoader tableLoader;
Expand All @@ -43,7 +45,6 @@ public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> {
* After a given number of Iceberg table commits since the last run, starts the downstream job.
*
* @param commitCount after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnCommitCount(int commitCount) {
triggerEvaluator.commitCount(commitCount);
Expand All @@ -54,7 +55,6 @@ public T scheduleOnCommitCount(int commitCount) {
* After a given number of new data files since the last run, starts the downstream job.
*
* @param dataFileCount after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnDataFileCount(int dataFileCount) {
triggerEvaluator.dataFileCount(dataFileCount);
Expand All @@ -65,7 +65,6 @@ public T scheduleOnDataFileCount(int dataFileCount) {
* After a given aggregated data file size since the last run, starts the downstream job.
*
* @param dataFileSizeInBytes after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes);
Expand All @@ -77,7 +76,6 @@ public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
* job.
*
* @param posDeleteFileCount after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) {
triggerEvaluator.posDeleteFileCount(posDeleteFileCount);
Expand All @@ -89,7 +87,6 @@ public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) {
* job.
*
* @param posDeleteRecordCount after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) {
triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount);
Expand All @@ -101,7 +98,6 @@ public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) {
* job.
*
* @param eqDeleteFileCount after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) {
triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount);
Expand All @@ -113,7 +109,6 @@ public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) {
* job.
*
* @param eqDeleteRecordCount after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount);
Expand All @@ -124,7 +119,6 @@ public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
* After a given time since the last run, starts the downstream job.
*
* @param interval after the downstream job should be started
* @return for chained calls
*/
public T scheduleOnInterval(Duration interval) {
triggerEvaluator.timeout(interval);
Expand All @@ -135,7 +129,6 @@ public T scheduleOnInterval(Duration interval) {
* The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid.
*
* @param newUidSuffix for the transformations
* @return for chained calls
*/
public T uidSuffix(String newUidSuffix) {
this.uidSuffix = newUidSuffix;
Expand All @@ -147,7 +140,6 @@ public T uidSuffix(String newUidSuffix) {
* generated stream. Could be used to separate the resources used by this task.
*
* @param newSlotSharingGroup to be used for the operators
* @return for chained calls
*/
public T slotSharingGroup(String newSlotSharingGroup) {
this.slotSharingGroup = newSlotSharingGroup;
Expand All @@ -158,15 +150,15 @@ public T slotSharingGroup(String newSlotSharingGroup) {
* Sets the parallelism for the stream.
*
* @param newParallelism the required parallelism
* @return for chained calls
*/
public T parallelism(int newParallelism) {
Preconditions.checkArgument(newParallelism > 0, "Parallelism should be greater than 0");
this.parallelism = newParallelism;
return (T) this;
}

@Internal
int id() {
int index() {
return index;
}

Expand All @@ -190,8 +182,7 @@ String slotSharingGroup() {
return slotSharingGroup;
}

@Internal
Integer parallelism() {
protected Integer parallelism() {
return parallelism;
}

Expand All @@ -204,19 +195,16 @@ TriggerEvaluator evaluator() {
DataStream<TaskResult> append(
DataStream<Trigger> sourceStream,
int maintenanceTaskIndex,
String maintainanceTaskName,
String maintenanceTaskName,
TableLoader newTableLoader,
String mainUidSuffix,
String mainSlotSharingGroup,
int mainParallelism) {
Preconditions.checkArgument(
parallelism == null || parallelism == -1 || parallelism > 0,
"Parallelism should be left to default (-1/null) or greater than 0");
Preconditions.checkNotNull(maintainanceTaskName, "Name should not be null");
Preconditions.checkNotNull(maintenanceTaskName, "Name should not be null");
Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null");

this.index = maintenanceTaskIndex;
this.name = maintainanceTaskName;
this.name = maintenanceTaskName;
this.tableLoader = newTableLoader;

if (uidSuffix == null) {
Expand Down
Loading

0 comments on commit ea3366e

Please sign in to comment.