Skip to content

Commit

Permalink
Flink: Port #11144 to v1.19 (#11473)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored Nov 8, 2024
1 parent 5c8a5d6 commit 3da64d3
Show file tree
Hide file tree
Showing 34 changed files with 2,527 additions and 552 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor;
import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Deletes expired snapshots and the corresponding files. */
public class ExpireSnapshots {
private static final int DELETE_BATCH_SIZE_DEFAULT = 1000;
private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot";
@VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file";

private ExpireSnapshots() {}

/** Creates the builder for creating a stream which expires snapshots for the table. */
public static Builder builder() {
return new Builder();
}

public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
private Duration maxSnapshotAge = null;
private Integer numSnapshots = null;
private Integer planningWorkerPoolSize;
private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT;

/**
* The snapshots older than this age will be removed.
*
* @param newMaxSnapshotAge of the snapshots to be removed
*/
public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
this.maxSnapshotAge = newMaxSnapshotAge;
return this;
}

/**
* The minimum number of {@link Snapshot}s to retain. For more details description see {@link
* org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
*
* @param newNumSnapshots number of snapshots to retain
*/
public Builder retainLast(int newNumSnapshots) {
this.numSnapshots = newNumSnapshots;
return this;
}

/**
* The worker pool size used to calculate the files to delete. If not set, the shared worker
* pool is used.
*
* @param newPlanningWorkerPoolSize for planning files to delete
*/
public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {
this.planningWorkerPoolSize = newPlanningWorkerPoolSize;
return this;
}

/**
* Size of the batch used to deleting the files.
*
* @param newDeleteBatchSize used for deleting
*/
public Builder deleteBatchSize(int newDeleteBatchSize) {
this.deleteBatchSize = newDeleteBatchSize;
return this;
}

@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null");

SingleOutputStreamOperator<TaskResult> result =
trigger
.process(
new ExpireSnapshotsProcessor(
tableLoader(),
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
numSnapshots,
planningWorkerPoolSize))
.name(operatorName(EXECUTOR_OPERATOR_NAME))
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();

result
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
.rebalance()
.transform(
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(
index(), taskName(), tableLoader().loadTable(), deleteBatchSize))
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());

// Ignore the file deletion result and return the DataStream<TaskResult> directly
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;
package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.sql.DatabaseMetaData;
Expand All @@ -38,10 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* JDBC table backed implementation of the {@link
* org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory}.
*/
/** JDBC table backed implementation of the {@link TriggerLockFactory}. */
public class JdbcLockFactory implements TriggerLockFactory {
private static final Logger LOG = LoggerFactory.getLogger(JdbcLockFactory.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Experimental
@SuppressWarnings("unchecked")
public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> {
private int index;
private String taskName;
private String tableName;
private TableLoader tableLoader;
private String uidSuffix = null;
private String slotSharingGroup = null;
private Integer parallelism = null;
private final TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder();

abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream);

/**
* After a given number of Iceberg table commits since the last run, starts the downstream job.
*
* @param commitCount after the downstream job should be started
*/
public T scheduleOnCommitCount(int commitCount) {
triggerEvaluator.commitCount(commitCount);
return (T) this;
}

/**
* After a given number of new data files since the last run, starts the downstream job.
*
* @param dataFileCount after the downstream job should be started
*/
public T scheduleOnDataFileCount(int dataFileCount) {
triggerEvaluator.dataFileCount(dataFileCount);
return (T) this;
}

/**
* After a given aggregated data file size since the last run, starts the downstream job.
*
* @param dataFileSizeInBytes after the downstream job should be started
*/
public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes);
return (T) this;
}

/**
* After a given number of new positional delete files since the last run, starts the downstream
* job.
*
* @param posDeleteFileCount after the downstream job should be started
*/
public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) {
triggerEvaluator.posDeleteFileCount(posDeleteFileCount);
return (T) this;
}

/**
* After a given number of new positional delete records since the last run, starts the downstream
* job.
*
* @param posDeleteRecordCount after the downstream job should be started
*/
public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) {
triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount);
return (T) this;
}

/**
* After a given number of new equality delete files since the last run, starts the downstream
* job.
*
* @param eqDeleteFileCount after the downstream job should be started
*/
public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) {
triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount);
return (T) this;
}

/**
* After a given number of new equality delete records since the last run, starts the downstream
* job.
*
* @param eqDeleteRecordCount after the downstream job should be started
*/
public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount);
return (T) this;
}

/**
* After a given time since the last run, starts the downstream job.
*
* @param interval after the downstream job should be started
*/
public T scheduleOnInterval(Duration interval) {
triggerEvaluator.timeout(interval);
return (T) this;
}

/**
* The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid.
*
* @param newUidSuffix for the transformations
*/
public T uidSuffix(String newUidSuffix) {
this.uidSuffix = newUidSuffix;
return (T) this;
}

/**
* The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the
* generated stream. Could be used to separate the resources used by this task.
*
* @param newSlotSharingGroup to be used for the operators
*/
public T slotSharingGroup(String newSlotSharingGroup) {
this.slotSharingGroup = newSlotSharingGroup;
return (T) this;
}

/**
* Sets the parallelism for the stream.
*
* @param newParallelism the required parallelism
*/
public T parallelism(int newParallelism) {
OperatorValidationUtils.validateParallelism(newParallelism);
this.parallelism = newParallelism;
return (T) this;
}

protected int index() {
return index;
}

protected String taskName() {
return taskName;
}

protected String tableName() {
return tableName;
}

protected TableLoader tableLoader() {
return tableLoader;
}

protected String uidSuffix() {
return uidSuffix;
}

protected String slotSharingGroup() {
return slotSharingGroup;
}

protected Integer parallelism() {
return parallelism;
}

protected String operatorName(String operatorNameBase) {
return operatorNameBase + "[" + index() + "]";
}

TriggerEvaluator evaluator() {
return triggerEvaluator.build();
}

DataStream<TaskResult> append(
DataStream<Trigger> sourceStream,
int taskIndex,
String newTaskName,
String newTableName,
TableLoader newTableLoader,
String defaultUidSuffix,
String defaultSlotSharingGroup,
int defaultParallelism) {
Preconditions.checkNotNull(newTaskName, "Task name should not be null");
Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null");

this.index = taskIndex;
this.taskName = newTaskName;
this.tableName = newTableName;
this.tableLoader = newTableLoader;

if (uidSuffix == null) {
uidSuffix = this.taskName + "_" + index + "_" + defaultUidSuffix;
}

if (parallelism == null) {
parallelism = defaultParallelism;
}

if (slotSharingGroup == null) {
slotSharingGroup = defaultSlotSharingGroup;
}

return append(sourceStream);
}
}
Loading

0 comments on commit 3da64d3

Please sign in to comment.