diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java new file mode 100644 index 000000000000..9cde5cb173e1 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Deletes expired snapshots and the corresponding files. */ +public class ExpireSnapshots { + private static final int DELETE_BATCH_SIZE_DEFAULT = 1000; + private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot"; + @VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file"; + + private ExpireSnapshots() {} + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder { + private Duration maxSnapshotAge = null; + private Integer numSnapshots = null; + private Integer planningWorkerPoolSize; + private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT; + + /** + * The snapshots older than this age will be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + */ + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; + return this; + } + + /** + * The minimum number of {@link Snapshot}s to retain. For more details description see {@link + * org.apache.iceberg.ExpireSnapshots#retainLast(int)}. + * + * @param newNumSnapshots number of snapshots to retain + */ + public Builder retainLast(int newNumSnapshots) { + this.numSnapshots = newNumSnapshots; + return this; + } + + /** + * The worker pool size used to calculate the files to delete. If not set, the shared worker + * pool is used. + * + * @param newPlanningWorkerPoolSize for planning files to delete + */ + public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { + this.planningWorkerPoolSize = newPlanningWorkerPoolSize; + return this; + } + + /** + * Size of the batch used to deleting the files. + * + * @param newDeleteBatchSize used for deleting + */ + public Builder deleteBatchSize(int newDeleteBatchSize) { + this.deleteBatchSize = newDeleteBatchSize; + return this; + } + + @Override + DataStream append(DataStream trigger) { + Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); + + SingleOutputStreamOperator result = + trigger + .process( + new ExpireSnapshotsProcessor( + tableLoader(), + maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), + numSnapshots, + planningWorkerPoolSize)) + .name(operatorName(EXECUTOR_OPERATOR_NAME)) + .uid(EXECUTOR_OPERATOR_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + result + .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM) + .rebalance() + .transform( + operatorName(DELETE_FILES_OPERATOR_NAME), + TypeInformation.of(Void.class), + new DeleteFilesProcessor( + index(), taskName(), tableLoader().loadTable(), deleteBatchSize)) + .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + // Ignore the file deletion result and return the DataStream directly + return result; + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java similarity index 98% rename from flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java rename to flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java index 085fbfecd270..9f4fb069aae8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/JdbcLockFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/JdbcLockFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.maintenance.operator; +package org.apache.iceberg.flink.maintenance.api; import java.io.IOException; import java.sql.DatabaseMetaData; @@ -38,10 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * JDBC table backed implementation of the {@link - * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory}. - */ +/** JDBC table backed implementation of the {@link TriggerLockFactory}. */ public class JdbcLockFactory implements TriggerLockFactory { private static final Logger LOG = LoggerFactory.getLogger(JdbcLockFactory.class); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java new file mode 100644 index 000000000000..3fc431d02547 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +@Experimental +@SuppressWarnings("unchecked") +public abstract class MaintenanceTaskBuilder> { + private int index; + private String taskName; + private String tableName; + private TableLoader tableLoader; + private String uidSuffix = null; + private String slotSharingGroup = null; + private Integer parallelism = null; + private final TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + + abstract DataStream append(DataStream sourceStream); + + /** + * After a given number of Iceberg table commits since the last run, starts the downstream job. + * + * @param commitCount after the downstream job should be started + */ + public T scheduleOnCommitCount(int commitCount) { + triggerEvaluator.commitCount(commitCount); + return (T) this; + } + + /** + * After a given number of new data files since the last run, starts the downstream job. + * + * @param dataFileCount after the downstream job should be started + */ + public T scheduleOnDataFileCount(int dataFileCount) { + triggerEvaluator.dataFileCount(dataFileCount); + return (T) this; + } + + /** + * After a given aggregated data file size since the last run, starts the downstream job. + * + * @param dataFileSizeInBytes after the downstream job should be started + */ + public T scheduleOnDataFileSize(long dataFileSizeInBytes) { + triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes); + return (T) this; + } + + /** + * After a given number of new positional delete files since the last run, starts the downstream + * job. + * + * @param posDeleteFileCount after the downstream job should be started + */ + public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) { + triggerEvaluator.posDeleteFileCount(posDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new positional delete records since the last run, starts the downstream + * job. + * + * @param posDeleteRecordCount after the downstream job should be started + */ + public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) { + triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); + return (T) this; + } + + /** + * After a given number of new equality delete files since the last run, starts the downstream + * job. + * + * @param eqDeleteFileCount after the downstream job should be started + */ + public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) { + triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new equality delete records since the last run, starts the downstream + * job. + * + * @param eqDeleteRecordCount after the downstream job should be started + */ + public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { + triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount); + return (T) this; + } + + /** + * After a given time since the last run, starts the downstream job. + * + * @param interval after the downstream job should be started + */ + public T scheduleOnInterval(Duration interval) { + triggerEvaluator.timeout(interval); + return (T) this; + } + + /** + * The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public T uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return (T) this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public T slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return (T) this; + } + + /** + * Sets the parallelism for the stream. + * + * @param newParallelism the required parallelism + */ + public T parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return (T) this; + } + + protected int index() { + return index; + } + + protected String taskName() { + return taskName; + } + + protected String tableName() { + return tableName; + } + + protected TableLoader tableLoader() { + return tableLoader; + } + + protected String uidSuffix() { + return uidSuffix; + } + + protected String slotSharingGroup() { + return slotSharingGroup; + } + + protected Integer parallelism() { + return parallelism; + } + + protected String operatorName(String operatorNameBase) { + return operatorNameBase + "[" + index() + "]"; + } + + TriggerEvaluator evaluator() { + return triggerEvaluator.build(); + } + + DataStream append( + DataStream sourceStream, + int taskIndex, + String newTaskName, + String newTableName, + TableLoader newTableLoader, + String defaultUidSuffix, + String defaultSlotSharingGroup, + int defaultParallelism) { + Preconditions.checkNotNull(newTaskName, "Task name should not be null"); + Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); + + this.index = taskIndex; + this.taskName = newTaskName; + this.tableName = newTableName; + this.tableLoader = newTableLoader; + + if (uidSuffix == null) { + uidSuffix = this.taskName + "_" + index + "_" + defaultUidSuffix; + } + + if (parallelism == null) { + parallelism = defaultParallelism; + } + + if (slotSharingGroup == null) { + slotSharingGroup = defaultSlotSharingGroup; + } + + return append(sourceStream); + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java new file mode 100644 index 000000000000..f931a9127137 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + static final String SOURCE_OPERATOR_NAME_PREFIX = "Monitor source for "; + static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; + static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; + static final String FILTER_OPERATOR_NAME_PREFIX = "Filter "; + static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover"; + + private TableMaintenance() {} + + /** + * Use when the change stream is already provided, like in the {@link + * IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + @Internal + public static Builder forChangeStream( + DataStream changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(null, changeStream, tableLoader, lockFactory); + } + + /** + * Use this for standalone maintenance job. It creates a monitor source that detect table changes + * and build the maintenance pipelines afterwards. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, null, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream inputStream; + private final TableLoader tableLoader; + private final List> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMinutes(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, + DataStream inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The suffix used for the generated {@link Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the default parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + */ + public Builder parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. + * + * @param newMaxReadBack snapshots to consider when initializing + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + inputStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + */ + public Builder add(MaintenanceTaskBuilder task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() throws IOException { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); + + List taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + try (TableLoader loader = tableLoader.clone()) { + loader.open(); + String tableName = loader.loadTable().name(); + DataStream triggers = + DataStreamUtils.reinterpretAsKeyedStream( + changeStream(tableName, loader), unused -> true) + .process( + new TriggerManager( + loader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_OPERATOR_NAME) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name(WATERMARK_ASSIGNER_OPERATOR_NAME) + .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int taskIndex = i; + DataStream filtered = + triggers + .filter(t -> t.taskId() != null && t.taskId() == taskIndex) + .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex) + .forceNonParallel() + .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + MaintenanceTaskBuilder builder = taskBuilders.get(taskIndex); + DataStream result = + builder.append( + filtered, + taskIndex, + taskNames.get(taskIndex), + tableName, + loader, + uidSuffix, + slotSharingGroup, + parallelism); + if (unioned == null) { + unioned = result; + } else { + unioned = unioned.union(result); + } + } + + // Add the LockRemover to the end + unioned + .transform( + LOCK_REMOVER_OPERATOR_NAME, + TypeInformation.of(Void.class), + new LockRemover(tableName, lockFactory, taskNames)) + .forceNonParallel() + .uid("lock-remover-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + } + } + + private DataStream changeStream(String tableName, TableLoader loader) { + if (inputStream == null) { + // Create a monitor source to provide the TableChange stream + MonitorSource source = + new MonitorSource( + loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); + return env.fromSource( + source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME_PREFIX + tableName) + .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } else { + return inputStream.global(); + } + } + + private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskIndex) { + return String.format( + "%s [%s]", streamBuilder.getClass().getSimpleName(), String.valueOf(taskIndex)); + } + } + + @Internal + public static class PunctuatedWatermarkStrategy implements WatermarkStrategy { + @Override + public WatermarkGenerator createWatermarkGenerator( + WatermarkGeneratorSupplier.Context context) { + return new WatermarkGenerator<>() { + @Override + public void onEvent(Trigger event, long eventTimestamp, WatermarkOutput output) { + output.emitWatermark(new Watermark(event.timestamp())); + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + // No periodic watermarks + } + }; + } + + @Override + public TimestampAssigner createTimestampAssigner( + TimestampAssignerSupplier.Context context) { + return (element, unused) -> element.timestamp(); + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TaskResult.java similarity index 93% rename from flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java rename to flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TaskResult.java index 06f10f1c1d68..ca1462526f13 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TaskResult.java @@ -16,15 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.maintenance.operator; +package org.apache.iceberg.flink.maintenance.api; +import java.io.Serializable; import java.util.List; -import org.apache.flink.annotation.Internal; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** The result of a single Maintenance Task. */ -@Internal -public class TaskResult { +public class TaskResult implements Serializable { private final int taskIndex; private final long startEpoch; private final boolean success; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/Trigger.java similarity index 67% rename from flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java rename to flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/Trigger.java index 85c6c8dbdd55..09209ba15153 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/Trigger.java @@ -16,47 +16,41 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.maintenance.operator; +package org.apache.iceberg.flink.maintenance.api; import org.apache.flink.annotation.Internal; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -@Internal -class Trigger { +public class Trigger { private final long timestamp; - private final SerializableTable table; private final Integer taskId; private final boolean isRecovery; - private Trigger(long timestamp, SerializableTable table, Integer taskId, boolean isRecovery) { + private Trigger(long timestamp, Integer taskId, boolean isRecovery) { this.timestamp = timestamp; - this.table = table; this.taskId = taskId; this.isRecovery = isRecovery; } - static Trigger create(long timestamp, SerializableTable table, int taskId) { - return new Trigger(timestamp, table, taskId, false); + @Internal + public static Trigger create(long timestamp, int taskId) { + return new Trigger(timestamp, taskId, false); } - static Trigger recovery(long timestamp) { - return new Trigger(timestamp, null, null, true); + @Internal + public static Trigger recovery(long timestamp) { + return new Trigger(timestamp, null, true); } - long timestamp() { + public long timestamp() { return timestamp; } - SerializableTable table() { - return table; - } - - Integer taskId() { + public Integer taskId() { return taskId; } - boolean isRecovery() { + public boolean isRecovery() { return isRecovery; } @@ -64,7 +58,6 @@ boolean isRecovery() { public String toString() { return MoreObjects.toStringHelper(this) .add("timestamp", timestamp) - .add("table", table == null ? null : table.name()) .add("taskId", taskId) .add("isRecovery", isRecovery) .toString(); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TriggerLockFactory.java similarity index 86% rename from flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java rename to flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TriggerLockFactory.java index 329223d27ccf..c31381355efe 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TriggerLockFactory.java @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.maintenance.operator; +package org.apache.iceberg.flink.maintenance.api; import java.io.Closeable; import java.io.Serializable; import org.apache.flink.annotation.Experimental; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; /** Lock interface for handling locks for the Flink Table Maintenance jobs. */ @Experimental @@ -51,12 +53,11 @@ interface Lock { */ boolean isHeld(); - // TODO: Fix the link to the LockRemover when we have a final name and implementation /** * Releases the lock. Should not fail if the lock is not held by anyone. * - *

Called by LockRemover. Implementations could assume that are no concurrent calls for this - * method. + *

Called by {@link LockRemover}. Implementations could assume that are no concurrent calls + * for this method. */ void unlock(); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java new file mode 100644 index 000000000000..dc7846c4c4d3 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Set; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Delete the files using the {@link FileIO} which implements {@link SupportsBulkOperations}. */ +@Internal +public class DeleteFilesProcessor extends AbstractStreamOperator + implements OneInputStreamOperator { + private static final Logger LOG = LoggerFactory.getLogger(DeleteFilesProcessor.class); + + private final String taskIndex; + private final String taskName; + private final SupportsBulkOperations io; + private final String tableName; + private final Set filesToDelete = Sets.newHashSet(); + private final int batchSize; + + private transient Counter failedCounter; + private transient Counter succeededCounter; + + public DeleteFilesProcessor(int taskIndex, String taskName, Table table, int batchSize) { + Preconditions.checkNotNull(taskName, "Task name should no be null"); + Preconditions.checkNotNull(table, "Table should no be null"); + + FileIO fileIO = table.io(); + Preconditions.checkArgument( + fileIO instanceof SupportsBulkOperations, + "%s doesn't support bulk delete", + fileIO.getClass().getSimpleName()); + + this.taskIndex = String.valueOf(taskIndex); + this.taskName = taskName; + this.io = (SupportsBulkOperations) fileIO; + this.tableName = table.name(); + this.batchSize = batchSize; + } + + @Override + public void open() throws Exception { + this.failedCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex) + .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); + this.succeededCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex) + .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + if (element.isRecord()) { + filesToDelete.add(element.getValue()); + } + + if (filesToDelete.size() >= batchSize) { + deleteFiles(); + } + } + + @Override + public void processWatermark(Watermark mark) { + deleteFiles(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + deleteFiles(); + } + + private void deleteFiles() { + try { + io.deleteFiles(filesToDelete); + LOG.info( + "Deleted {} files from table {} using bulk deletes", filesToDelete.size(), tableName); + succeededCounter.inc(filesToDelete.size()); + filesToDelete.clear(); + } catch (BulkDeletionFailureException e) { + int deletedFilesCount = filesToDelete.size() - e.numberFailedObjects(); + LOG.warn( + "Deleted only {} of {} files from table {} using bulk deletes", + deletedFilesCount, + filesToDelete.size(), + tableName, + e); + succeededCounter.inc(deletedFilesCount); + failedCounter.inc(e.numberFailedObjects()); + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java new file mode 100644 index 000000000000..a09d0244e95d --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could + * be removed in the {@link #DELETE_STREAM} side output. + */ +@Internal +public class ExpireSnapshotsProcessor extends ProcessFunction { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class); + public static final OutputTag DELETE_STREAM = + new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING); + + private final TableLoader tableLoader; + private final Long maxSnapshotAgeMs; + private final Integer numSnapshots; + private final Integer plannerPoolSize; + private transient ExecutorService plannerPool; + private transient Table table; + + public ExpireSnapshotsProcessor( + TableLoader tableLoader, + Long maxSnapshotAgeMs, + Integer numSnapshots, + Integer plannerPoolSize) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.tableLoader = tableLoader; + this.maxSnapshotAgeMs = maxSnapshotAgeMs; + this.numSnapshots = numSnapshots; + this.plannerPoolSize = plannerPoolSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.plannerPool = + plannerPoolSize != null + ? ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize) + : ThreadPools.getWorkerPool(); + } + + @Override + public void processElement(Trigger trigger, Context ctx, Collector out) + throws Exception { + try { + table.refresh(); + ExpireSnapshots expireSnapshots = table.expireSnapshots(); + if (maxSnapshotAgeMs != null) { + expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - maxSnapshotAgeMs); + } + + if (numSnapshots != null) { + expireSnapshots = expireSnapshots.retainLast(numSnapshots); + } + + AtomicLong deleteFileCounter = new AtomicLong(0L); + expireSnapshots + .planWith(plannerPool) + .deleteWith( + file -> { + ctx.output(DELETE_STREAM, file); + deleteFileCounter.incrementAndGet(); + }) + .cleanExpiredFiles(true) + .commit(); + + LOG.info( + "Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.", + table, + ctx.timestamp(), + deleteFileCounter.get()); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList())); + } catch (Exception e) { + LOG.error("Failed to expiring snapshots for {} at {}", table, ctx.timestamp(), e); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e))); + } + } + + @Override + public void close() throws Exception { + super.close(); + + tableLoader.close(); + if (plannerPoolSize != null) { + plannerPool.shutdown(); + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java index 3c3761ef2f4d..14d590162c8b 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemover.java @@ -26,6 +26,9 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.slf4j.Logger; @@ -64,6 +67,7 @@ public class LockRemover extends AbstractStreamOperator implements OneInputStreamOperator { private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class); + private final String tableName; private final TriggerLockFactory lockFactory; private final List maintenanceTaskNames; @@ -74,12 +78,14 @@ public class LockRemover extends AbstractStreamOperator private transient TriggerLockFactory.Lock recoveryLock; private transient long lastProcessedTaskStartEpoch = 0L; - public LockRemover(TriggerLockFactory lockFactory, List maintenanceTaskNames) { + public LockRemover( + String tableName, TriggerLockFactory lockFactory, List maintenanceTaskNames) { Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); Preconditions.checkArgument( maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(), "Invalid maintenance task names: null or empty"); + this.tableName = tableName; this.lockFactory = lockFactory; this.maintenanceTaskNames = maintenanceTaskNames; } @@ -91,22 +97,31 @@ public void open() throws Exception { Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); - for (String name : maintenanceTaskNames) { + for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { succeededTaskResultCounters.add( getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) .counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER)); failedTaskResultCounters.add( getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) .counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER)); AtomicLong duration = new AtomicLong(0); taskLastRunDurationMs.add(duration); getRuntimeContext() .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) .gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java index 89efffa15f16..d74b2349b1de 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java @@ -43,7 +43,7 @@ /** Monitors an Iceberg table for changes */ @Internal -class MonitorSource extends SingleThreadedIteratorSource { +public class MonitorSource extends SingleThreadedIteratorSource { private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); private final TableLoader tableLoader; @@ -58,7 +58,7 @@ class MonitorSource extends SingleThreadedIteratorSource { * @param rateLimiterStrategy limits the frequency the table is checked * @param maxReadBack sets the number of snapshots read before stopping change collection */ - MonitorSource( + public MonitorSource( TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) { Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null"); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java index 773b34b6c495..78be0a9c15d6 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java @@ -28,7 +28,7 @@ /** Event describing changes in an Iceberg table */ @Internal -class TableChange { +public class TableChange { private int dataFileCount; private long dataFileSizeInBytes; private int posDeleteFileCount; @@ -37,7 +37,7 @@ class TableChange { private long eqDeleteRecordCount; private int commitCount; - TableChange( + private TableChange( int dataFileCount, long dataFileSizeInBytes, int posDeleteFileCount, @@ -87,7 +87,7 @@ static TableChange empty() { return new TableChange(0, 0L, 0, 0L, 0, 0L, 0); } - static Builder builder() { + public static Builder builder() { return new Builder(); } @@ -115,7 +115,7 @@ long eqDeleteRecordCount() { return eqDeleteRecordCount; } - public int commitCount() { + int commitCount() { return commitCount; } @@ -183,7 +183,7 @@ public int hashCode() { commitCount); } - static class Builder { + public static class Builder { private int dataFileCount = 0; private long dataFileSizeInBytes = 0L; private int posDeleteFileCount = 0; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java index 1a04461aed43..6147c3a5fd16 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java @@ -19,8 +19,10 @@ package org.apache.iceberg.flink.maintenance.operator; public class TableMaintenanceMetrics { - public static final String GROUP_KEY = "maintenanceTask"; - public static final String GROUP_VALUE_DEFAULT = "maintenanceTask"; + public static final String GROUP_KEY = "maintenance"; + public static final String TASK_NAME_KEY = "taskName"; + public static final String TASK_INDEX_KEY = "taskIndex"; + public static final String TABLE_NAME_KEY = "tableName"; // TriggerManager metrics public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered"; @@ -33,6 +35,10 @@ public class TableMaintenanceMetrics { public static final String FAILED_TASK_COUNTER = "failedTasks"; public static final String LAST_RUN_DURATION_MS = "lastRunDurationMs"; + // DeleteFiles metrics + public static final String DELETE_FILE_FAILED_COUNTER = "deleteFailed"; + public static final String DELETE_FILE_SUCCEEDED_COUNTER = "deleteSucceeded"; + private TableMaintenanceMetrics() { // do not instantiate } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java index dba33b22a42a..d448898bdfe6 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; @Internal -class TriggerEvaluator implements Serializable { +public class TriggerEvaluator implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(TriggerEvaluator.class); private final List predicates; @@ -50,7 +50,7 @@ boolean check(TableChange event, long lastTimeMs, long currentTimeMs) { return result; } - static class Builder implements Serializable { + public static class Builder implements Serializable { private Integer dataFileCount; private Long dataFileSizeInBytes; private Integer posDeleteFileCount; @@ -95,12 +95,12 @@ public Builder commitCount(int newCommitCount) { return this; } - Builder timeout(Duration newTimeout) { + public Builder timeout(Duration newTimeout) { this.timeout = newTimeout; return this; } - TriggerEvaluator build() { + public TriggerEvaluator build() { List predicates = Lists.newArrayList(); if (dataFileCount != null) { predicates.add((change, unused, unused2) -> change.dataFileCount() >= dataFileCount); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java index dc95b27af0a6..a96e99d94299 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -36,8 +35,9 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.slf4j.Logger; @@ -57,11 +57,11 @@ * the timer functions are available, but the key is not used. */ @Internal -class TriggerManager extends KeyedProcessFunction +public class TriggerManager extends KeyedProcessFunction implements CheckpointedFunction { private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); - private final TableLoader tableLoader; + private final String tableName; private final TriggerLockFactory lockFactory; private final List maintenanceTaskNames; private final List evaluators; @@ -89,7 +89,7 @@ class TriggerManager extends KeyedProcessFunction private transient int startsFrom = 0; private transient boolean triggered = false; - TriggerManager( + public TriggerManager( TableLoader tableLoader, TriggerLockFactory lockFactory, List maintenanceTaskNames, @@ -110,7 +110,8 @@ class TriggerManager extends KeyedProcessFunction Preconditions.checkArgument( lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 ms."); - this.tableLoader = tableLoader; + tableLoader.open(); + this.tableName = tableLoader.loadTable().name(); this.lockFactory = lockFactory; this.maintenanceTaskNames = maintenanceTaskNames; this.evaluators = evaluators; @@ -123,30 +124,32 @@ public void open(Configuration parameters) throws Exception { this.rateLimiterTriggeredCounter = getRuntimeContext() .getMetricGroup() - .addGroup( - TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); this.concurrentRunThrottledCounter = getRuntimeContext() .getMetricGroup() - .addGroup( - TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) .counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED); this.nothingToTriggerCounter = getRuntimeContext() .getMetricGroup() - .addGroup( - TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); - this.triggerCounters = - maintenanceTaskNames.stream() - .map( - name -> - getRuntimeContext() - .getMetricGroup() - .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) - .counter(TableMaintenanceMetrics.TRIGGERED)) - .collect(Collectors.toList()); + this.triggerCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); + for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { + triggerCounters.add( + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY) + .addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName) + .addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex)) + .addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex)) + .counter(TableMaintenanceMetrics.TRIGGERED)); + } this.nextEvaluationTimeState = getRuntimeContext() @@ -159,8 +162,6 @@ public void open(Configuration parameters) throws Exception { this.lastTriggerTimesState = getRuntimeContext() .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); - - tableLoader.open(); } @Override @@ -220,7 +221,6 @@ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) @Override public void close() throws IOException { - tableLoader.close(); lockFactory.close(); } @@ -256,10 +256,8 @@ private void checkAndFire(long current, TimerService timerService, Collector + *

  • {@link StreamExecutionEnvironment} - environment for testing + *
  • {@link ManualSource} - source for manually emitting {@link Trigger}s + *
  • {@link DataStream} - which generated from the {@link ManualSource} + *
  • {@link CollectingSink} - which could be used poll for the records emitted by the + * maintenance tasks + * + */ +class MaintenanceTaskInfraExtension implements BeforeEachCallback { + private StreamExecutionEnvironment env; + private ManualSource source; + private DataStream triggerStream; + private CollectingSink sink; + + @Override + public void beforeEach(ExtensionContext context) { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + source = new ManualSource<>(env, TypeInformation.of(Trigger.class)); + // Adds the watermark to mimic the behaviour expected for the input of the maintenance tasks + triggerStream = + source + .dataStream() + .assignTimestampsAndWatermarks(new TableMaintenance.PunctuatedWatermarkStrategy()) + .name(IGNORED_OPERATOR_NAME) + .forceNonParallel(); + sink = new CollectingSink<>(); + } + + StreamExecutionEnvironment env() { + return env; + } + + ManualSource source() { + return source; + } + + DataStream triggerStream() { + return triggerStream; + } + + CollectingSink sink() { + return sink; + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java new file mode 100644 index 000000000000..36041d9c3831 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskTestBase.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.function.Supplier; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.flink.maintenance.operator.CollectingSink; +import org.apache.iceberg.flink.maintenance.operator.ManualSource; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.extension.RegisterExtension; + +class MaintenanceTaskTestBase extends OperatorTestBase { + private static final int TESTING_TASK_ID = 0; + private static final Duration POLL_DURATION = Duration.ofSeconds(5); + + @RegisterExtension MaintenanceTaskInfraExtension infra = new MaintenanceTaskInfraExtension(); + + void runAndWaitForSuccess( + StreamExecutionEnvironment env, + ManualSource triggerSource, + CollectingSink collectingSink, + Supplier waitForCondition) + throws Exception { + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Do a single task run + long time = System.currentTimeMillis(); + triggerSource.sendRecord(Trigger.create(time, TESTING_TASK_ID), time); + + TaskResult result = collectingSink.poll(POLL_DURATION); + + assertThat(result.startEpoch()).isEqualTo(time); + assertThat(result.success()).isTrue(); + assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID); + + Awaitility.await().until(waitForCondition::get); + } finally { + closeJobClient(jobClient); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java new file mode 100644 index 000000000000..f80129f966e1 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import static org.apache.iceberg.flink.SimpleDataUtil.createRecord; +import static org.apache.iceberg.flink.maintenance.api.ExpireSnapshots.DELETE_FILES_OPERATOR_NAME; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.Set; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestExpireSnapshots extends MaintenanceTaskTestBase { + private Table table; + + @BeforeEach + void before() { + MetricsReporterFactoryForTests.reset(); + this.table = createTable(); + tableLoader().open(); + } + + @Test + void testExpireSnapshots() throws Exception { + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + insert(table, 4, "d"); + + Set snapshots = Sets.newHashSet(table.snapshots()); + assertThat(snapshots).hasSize(4); + + ExpireSnapshots.builder() + .parallelism(1) + .planningWorkerPoolSize(2) + .deleteBatchSize(3) + .maxSnapshotAge(Duration.ZERO) + .retainLast(1) + .uidSuffix(UID_SUFFIX) + .append( + infra.triggerStream(), + 0, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + tableLoader(), + "OTHER", + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + runAndWaitForSuccess( + infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(table.name(), 3L)); + + table.refresh(); + assertThat(Sets.newHashSet(table.snapshots())).hasSize(1); + // Check that the table data not changed + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "a"), + createRecord(2, "b"), + createRecord(3, "c"), + createRecord(4, "d"))); + } + + @Test + void testFailure() throws Exception { + insert(table, 1, "a"); + insert(table, 2, "b"); + + ExpireSnapshots.builder() + .append( + infra.triggerStream(), + 0, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + tableLoader(), + UID_SUFFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + JobClient jobClient = null; + try { + jobClient = infra.env().executeAsync(); + + // Do a single task run + long time = System.currentTimeMillis(); + infra.source().sendRecord(Trigger.create(time, 1), time); + + // First successful run (ensure that the operators are loaded/opened etc.) + assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue(); + + // Drop the table, so it will cause an exception + dropTable(); + + // Failed run + infra.source().sendRecord(Trigger.create(time + 1, 1), time + 1); + + assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isFalse(); + } finally { + closeJobClient(jobClient); + } + + // Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has + // no max age of number of snapshots set, so no files are removed. + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_FAILED_COUNTER), + 0L) + .put( + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_SUCCEEDED_COUNTER), + 0L) + .build()); + } + + @Test + void testUidAndSlotSharingGroup() { + ExpireSnapshots.builder() + .slotSharingGroup(SLOT_SHARING_GROUP) + .uidSuffix(UID_SUFFIX) + .append( + infra.triggerStream(), + 0, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + tableLoader(), + UID_SUFFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + checkUidsAreSet(infra.env(), UID_SUFFIX); + checkSlotSharingGroupsAreSet(infra.env(), SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupUnset() { + ExpireSnapshots.builder() + .append( + infra.triggerStream(), + 0, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + tableLoader(), + UID_SUFFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + checkUidsAreSet(infra.env(), null); + checkSlotSharingGroupsAreSet(infra.env(), null); + } + + @Test + void testMetrics() throws Exception { + insert(table, 1, "a"); + insert(table, 2, "b"); + + ExpireSnapshots.builder() + .maxSnapshotAge(Duration.ZERO) + .retainLast(1) + .parallelism(1) + .append( + infra.triggerStream(), + 0, + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + tableLoader(), + UID_SUFFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + runAndWaitForSuccess( + infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(table.name(), 1L)); + + // Check the metrics + Awaitility.await() + .untilAsserted( + () -> + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_FAILED_COUNTER), + 0L) + .put( + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + table.name(), + DUMMY_TASK_NAME, + "0", + DELETE_FILE_SUCCEEDED_COUNTER), + 1L) + .build())); + } + + private static boolean checkDeleteFinished(String tableName, Long expectedDeleteNum) { + return expectedDeleteNum.equals( + MetricsReporterFactoryForTests.counter( + ImmutableList.of( + DELETE_FILES_OPERATOR_NAME + "[0]", + tableName, + DUMMY_TASK_NAME, + "0", + DELETE_FILE_SUCCEEDED_COUNTER))); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java similarity index 92% rename from flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java rename to flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java index 051d09d92bad..c8fa2a7d362a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestJdbcLockFactory.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestJdbcLockFactory.java @@ -16,9 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.maintenance.operator; +package org.apache.iceberg.flink.maintenance.api; -import static org.apache.iceberg.flink.maintenance.operator.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; +import static org.apache.iceberg.flink.maintenance.api.JdbcLockFactory.INIT_LOCK_TABLES_PROPERTY; import static org.assertj.core.api.Assertions.assertThat; import java.util.Map; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestLockFactoryBase.java similarity index 97% rename from flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java rename to flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestLockFactoryBase.java index bf9e86f2534d..c06bef9087d5 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockFactoryBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestLockFactoryBase.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.flink.maintenance.operator; +package org.apache.iceberg.flink.maintenance.api; import static org.assertj.core.api.Assertions.assertThat; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java new file mode 100644 index 000000000000..467ad2d8ced9 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestMaintenanceE2E extends OperatorTestBase { + private StreamExecutionEnvironment env; + + @BeforeEach + public void beforeEach() throws IOException { + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); + Table table = createTable(); + insert(table, 1, "a"); + } + + @Test + void testE2e() throws Exception { + TableMaintenance.forTable(env, tableLoader(), LOCK_FACTORY) + .uidSuffix("E2eTestUID") + .rateLimit(Duration.ofMinutes(10)) + .lockCheckDelay(Duration.ofSeconds(10)) + .add( + ExpireSnapshots.builder() + .scheduleOnCommitCount(10) + .maxSnapshotAge(Duration.ofMinutes(10)) + .retainLast(5) + .deleteBatchSize(5) + .parallelism(8)) + .append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Just make sure that we are able to instantiate the flow + assertThat(jobClient).isNotNull(); + } finally { + closeJobClient(jobClient); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java new file mode 100644 index 000000000000..0e4a72bd16f8 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import static org.apache.iceberg.flink.SimpleDataUtil.createRowData; +import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.LOCK_REMOVER_OPERATOR_NAME; +import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX; +import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.TRIGGER_MANAGER_OPERATOR_NAME; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.ManualSource; +import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestTableMaintenance extends OperatorTestBase { + private static final String[] TASKS = + new String[] { + MaintenanceTaskBuilderForTest.class.getSimpleName() + " [0]", + MaintenanceTaskBuilderForTest.class.getSimpleName() + " [1]" + }; + private static final TableChange DUMMY_CHANGE = TableChange.builder().commitCount(1).build(); + private static final List PROCESSED = + Collections.synchronizedList(Lists.newArrayListWithCapacity(1)); + + private StreamExecutionEnvironment env; + private Table table; + + @TempDir private File checkpointDir; + + @BeforeEach + public void beforeEach() throws IOException { + Configuration config = new Configuration(); + config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + checkpointDir.getPath()); + this.env = StreamExecutionEnvironment.getExecutionEnvironment(config); + this.table = createTable(); + insert(table, 1, "a"); + + PROCESSED.clear(); + MaintenanceTaskBuilderForTest.counter = 0; + } + + @Test + void testForChangeStream() throws Exception { + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader(), LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .lockCheckDelay(Duration.ofSeconds(3)) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .scheduleOnDataFileCount(2) + .scheduleOnDataFileSize(3L) + .scheduleOnEqDeleteFileCount(4) + .scheduleOnEqDeleteRecordCount(5L) + .scheduleOnPosDeleteFileCount(6) + .scheduleOnPosDeleteRecordCount(7L) + .scheduleOnInterval(Duration.ofHours(1))); + + sendEvents(schedulerSource, streamBuilder, ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1))); + } + + @Test + void testForTable() throws Exception { + TableLoader tableLoader = tableLoader(); + + env.enableCheckpointing(10); + + TableMaintenance.forTable(env, tableLoader, LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .maxReadBack(2) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(2)) + .append(); + + // Creating a stream for inserting data into the table concurrently + ManualSource insertSource = + new ManualSource<>(env, InternalTypeInfo.of(FlinkSchemaUtil.convert(table.schema()))); + FlinkSink.forRowData(insertSource.dataStream()) + .tableLoader(tableLoader) + .uidPrefix(UID_SUFFIX + "-iceberg-sink") + .append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + insertSource.sendRecord(createRowData(2, "b")); + + Awaitility.await().until(() -> PROCESSED.size() == 1); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testLocking() throws Exception { + TriggerLockFactory.Lock lock = LOCK_FACTORY.createLock(); + + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader(), LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)); + + assertThat(lock.isHeld()).isFalse(); + sendEvents(schedulerSource, streamBuilder, ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1))); + + assertThat(lock.isHeld()).isFalse(); + } + + @Test + void testMetrics() throws Exception { + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader(), LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .lockCheckDelay(Duration.ofMillis(2)) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .add(new MaintenanceTaskBuilderForTest(false).scheduleOnCommitCount(2)); + + sendEvents( + schedulerSource, + streamBuilder, + ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1), Tuple2.of(DUMMY_CHANGE, 2))); + + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, + table.name(), + TASKS[0], + "0", + SUCCEEDED_TASK_COUNTER)) + .equals(2L)); + + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, + table.name(), + TASKS[0], + "0", + SUCCEEDED_TASK_COUNTER), + 2L) + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, table.name(), TASKS[0], "0", FAILED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), TASKS[0], "0", TRIGGERED), + 2L) + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, + table.name(), + TASKS[1], + "1", + SUCCEEDED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + LOCK_REMOVER_OPERATOR_NAME, table.name(), TASKS[1], "1", FAILED_TASK_COUNTER), + 1L) + .put( + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), TASKS[1], "1", TRIGGERED), + 1L) + .put( + ImmutableList.of(TRIGGER_MANAGER_OPERATOR_NAME, table.name(), NOTHING_TO_TRIGGER), + -1L) + .put( + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), CONCURRENT_RUN_THROTTLED), + -1L) + .put( + ImmutableList.of( + TRIGGER_MANAGER_OPERATOR_NAME, table.name(), RATE_LIMITER_TRIGGERED), + -1L) + .build()); + } + + @Test + void testUidAndSlotSharingGroup() throws IOException { + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader(), + LOCK_FACTORY) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP)) + .append(); + + checkUidsAreSet(env, UID_SUFFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupUnset() throws IOException { + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader(), + LOCK_FACTORY) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .append(); + + checkUidsAreSet(env, null); + checkSlotSharingGroupsAreSet(env, null); + } + + @Test + void testUidAndSlotSharingGroupInherit() throws IOException { + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader(), + LOCK_FACTORY) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .append(); + + checkUidsAreSet(env, UID_SUFFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupOverWrite() throws IOException { + String anotherUid = "Another-UID"; + String anotherSlotSharingGroup = "Another-SlotSharingGroup"; + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader(), + LOCK_FACTORY) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidSuffix(anotherUid) + .slotSharingGroup(anotherSlotSharingGroup)) + .append(); + + // Choose an operator from the scheduler part of the graph + Transformation schedulerTransformation = + env.getTransformations().stream() + .filter(t -> t.getName().equals("Trigger manager")) + .findFirst() + .orElseThrow(); + assertThat(schedulerTransformation.getUid()).contains(UID_SUFFIX); + assertThat(schedulerTransformation.getSlotSharingGroup()).isPresent(); + assertThat(schedulerTransformation.getSlotSharingGroup().get().getName()) + .isEqualTo(SLOT_SHARING_GROUP); + + // Choose an operator from the maintenance task part of the graph + Transformation scheduledTransformation = + env.getTransformations().stream() + .filter( + t -> t.getName().startsWith(MaintenanceTaskBuilderForTest.class.getSimpleName())) + .findFirst() + .orElseThrow(); + assertThat(scheduledTransformation.getUid()).contains(anotherUid); + assertThat(scheduledTransformation.getSlotSharingGroup()).isPresent(); + assertThat(scheduledTransformation.getSlotSharingGroup().get().getName()) + .isEqualTo(anotherSlotSharingGroup); + } + + @Test + void testUidAndSlotSharingGroupForMonitorSource() throws IOException { + TableMaintenance.forTable(env, tableLoader(), LOCK_FACTORY) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP)) + .append(); + + Transformation source = monitorSource(); + assertThat(source).isNotNull(); + assertThat(source.getUid()).contains(UID_SUFFIX); + assertThat(source.getSlotSharingGroup()).isPresent(); + assertThat(source.getSlotSharingGroup().get().getName()).isEqualTo(SLOT_SHARING_GROUP); + + checkUidsAreSet(env, UID_SUFFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + /** + * Sends the events though the {@link ManualSource} provided, and waits until the given number of + * records are processed. + * + * @param schedulerSource used for sending the events + * @param streamBuilder used for generating the job + * @param eventsAndResultNumbers the pair of the event and the expected processed records + * @throws Exception if any + */ + private void sendEvents( + ManualSource schedulerSource, + TableMaintenance.Builder streamBuilder, + List> eventsAndResultNumbers) + throws Exception { + streamBuilder.append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + eventsAndResultNumbers.forEach( + eventsAndResultNumber -> { + int expectedSize = PROCESSED.size() + eventsAndResultNumber.f1; + schedulerSource.sendRecord(eventsAndResultNumber.f0); + Awaitility.await() + .until( + () -> PROCESSED.size() == expectedSize && !LOCK_FACTORY.createLock().isHeld()); + }); + } finally { + closeJobClient(jobClient); + } + } + + /** + * Finds the {@link org.apache.iceberg.flink.maintenance.operator.MonitorSource} for testing + * purposes by parsing the transformation tree. + * + * @return The monitor source if we found it + */ + private Transformation monitorSource() { + assertThat(env.getTransformations()).isNotEmpty(); + assertThat(env.getTransformations().get(0).getInputs()).isNotEmpty(); + assertThat(env.getTransformations().get(0).getInputs().get(0).getInputs()).isNotEmpty(); + + Transformation result = + env.getTransformations().get(0).getInputs().get(0).getInputs().get(0); + + // Some checks to make sure this is the transformation we are looking for + assertThat(result).isInstanceOf(SourceTransformation.class); + assertThat(result.getName()).startsWith(SOURCE_OPERATOR_NAME_PREFIX); + + return result; + } + + private static class MaintenanceTaskBuilderForTest + extends MaintenanceTaskBuilder { + private final boolean success; + private final int id; + private static int counter = 0; + + MaintenanceTaskBuilderForTest(boolean success) { + this.success = success; + this.id = counter; + ++counter; + } + + @Override + DataStream append(DataStream trigger) { + String name = TASKS[id]; + return trigger + .map(new DummyMaintenanceTask(success)) + .name(name) + .uid(uidSuffix() + "-test-mapper-" + name + "-" + id) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + } + } + + private static class DummyMaintenanceTask + implements MapFunction, ResultTypeQueryable, Serializable { + private final boolean success; + + private DummyMaintenanceTask(boolean success) { + this.success = success; + } + + @Override + public TaskResult map(Trigger trigger) { + // Ensure that the lock is held when processing + assertThat(LOCK_FACTORY.createLock().isHeld()).isTrue(); + PROCESSED.add(trigger); + + return new TaskResult( + trigger.taskId(), + trigger.timestamp(), + success, + success ? Collections.emptyList() : Lists.newArrayList(new Exception("Testing error"))); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(TaskResult.class); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java index 9b6580fad0bf..e7e818ba6887 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** Sink for collecting output during testing. */ -class CollectingSink implements Sink { +public class CollectingSink implements Sink { private static final long serialVersionUID = 1L; private static final List> QUEUES = Collections.synchronizedList(Lists.newArrayListWithExpectedSize(1)); @@ -39,7 +39,7 @@ class CollectingSink implements Sink { private final int index; /** Creates a new sink which collects the elements received. */ - CollectingSink() { + public CollectingSink() { this.index = NUM_SINKS.incrementAndGet(); QUEUES.add(new LinkedBlockingQueue<>()); } @@ -69,7 +69,7 @@ boolean isEmpty() { * @return The first element received by this {@link Sink} * @throws TimeoutException if no element received until the timeout */ - T poll(Duration timeout) throws TimeoutException { + public T poll(Duration timeout) throws TimeoutException { Object element; try { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java deleted file mode 100644 index 36e162d4f068..000000000000 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.maintenance.operator; - -class ConstantsForTests { - public static final long EVENT_TIME = 10L; - static final long EVENT_TIME_2 = 11L; - static final String DUMMY_NAME = "dummy"; - - private ConstantsForTests() { - // Do not instantiate - } -} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java deleted file mode 100644 index 91d36aa3e85d..000000000000 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.maintenance.operator; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.flink.CatalogLoader; -import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; - -/** - * Junit 5 extension for running Flink SQL queries. {@link - * org.apache.flink.test.junit5.MiniClusterExtension} is used for executing the SQL batch jobs. - */ -public class FlinkSqlExtension implements BeforeEachCallback, AfterEachCallback { - private final String catalogName; - private final Map catalogProperties; - private final String databaseName; - private final Path warehouse; - private final CatalogLoader catalogLoader; - private TableEnvironment tableEnvironment; - - public FlinkSqlExtension( - String catalogName, Map catalogProperties, String databaseName) { - this.catalogName = catalogName; - this.catalogProperties = Maps.newHashMap(catalogProperties); - this.databaseName = databaseName; - - // Add temporary dir as a warehouse location - try { - this.warehouse = Files.createTempDirectory("warehouse"); - } catch (IOException e) { - throw new RuntimeException(e); - } - this.catalogProperties.put( - CatalogProperties.WAREHOUSE_LOCATION, String.format("file://%s", warehouse)); - this.catalogLoader = - CatalogLoader.hadoop(catalogName, new Configuration(), this.catalogProperties); - } - - @Override - public void beforeEach(ExtensionContext context) { - // We need to recreate the tableEnvironment for every test as the minicluster is recreated - this.tableEnvironment = - TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); - exec("CREATE CATALOG %s WITH %s", catalogName, toWithClause(catalogProperties)); - exec("CREATE DATABASE %s.%s", catalogName, databaseName); - exec("USE CATALOG %s", catalogName); - exec("USE %s", databaseName); - } - - @Override - public void afterEach(ExtensionContext context) throws IOException { - List tables = exec("SHOW TABLES"); - tables.forEach(t -> exec("DROP TABLE IF EXISTS %s", t.getField(0))); - exec("USE CATALOG default_catalog"); - exec("DROP CATALOG IF EXISTS %s", catalogName); - try (Stream files = Files.walk(warehouse)) { - files.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } - } - - /** - * Executes an SQL query with the given parameters. The parameter substitution is done by {@link - * String#format(String, Object...)}. - * - * @param query to run - * @param parameters to substitute to the query - * @return The {@link Row}s returned by the query - */ - public List exec(String query, Object... parameters) { - TableResult tableResult = tableEnvironment.executeSql(String.format(query, parameters)); - try (CloseableIterator iter = tableResult.collect()) { - return Lists.newArrayList(iter); - } catch (Exception e) { - throw new RuntimeException("Failed to collect table result", e); - } - } - - /** - * Returns the {@link TableLoader} which could be used to access the given table. - * - * @param tableName of the table - * @return the {@link TableLoader} for the table - */ - public TableLoader tableLoader(String tableName) { - TableLoader tableLoader = - TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(databaseName, tableName)); - tableLoader.open(); - return tableLoader; - } - - private static String toWithClause(Map props) { - return String.format( - "(%s)", - props.entrySet().stream() - .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) - .collect(Collectors.joining(","))); - } -} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java deleted file mode 100644 index 9cdc55cb0cce..000000000000 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.maintenance.operator; - -import java.io.File; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.awaitility.Awaitility; - -class FlinkStreamingTestUtils { - private FlinkStreamingTestUtils() { - // Do not instantiate - } - - /** - * Close the {@link JobClient} and wait for the job closure. If the savepointDir is specified, it - * stops the job with a savepoint. - * - * @param jobClient the job to close - * @param savepointDir the savepointDir to store the last savepoint. If null then - * stop without a savepoint. - * @return configuration for restarting the job from the savepoint - */ - static Configuration closeJobClient(JobClient jobClient, File savepointDir) { - Configuration conf = new Configuration(); - if (jobClient != null) { - if (savepointDir != null) { - // Stop with savepoint - jobClient.stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL); - // Wait until the savepoint is created and the job has been stopped - Awaitility.await().until(() -> savepointDir.listFiles(File::isDirectory).length == 1); - conf.set( - SavepointConfigOptions.SAVEPOINT_PATH, - savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath()); - } else { - jobClient.cancel(); - } - - // Wait until the job has been stopped - Awaitility.await().until(() -> jobClient.getJobStatus().get().isTerminalState()); - return conf; - } - - return null; - } - - /** - * Close the {@link JobClient} and wait for the job closure. - * - * @param jobClient the job to close - */ - static void closeJobClient(JobClient jobClient) { - closeJobClient(jobClient, null); - } -} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java index 679b3ec508a2..eff32fcfa118 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java @@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable; /** Testing source implementation for Flink sources which can be triggered manually. */ -class ManualSource +public class ManualSource implements Source, ResultTypeQueryable { @@ -65,7 +65,7 @@ class ManualSource * @param env to register the source * @param type of the events returned by the source */ - ManualSource(StreamExecutionEnvironment env, TypeInformation type) { + public ManualSource(StreamExecutionEnvironment env, TypeInformation type) { this.type = type; this.env = env; this.index = numSources++; @@ -78,7 +78,7 @@ class ManualSource * * @param event to emit */ - void sendRecord(T event) { + public void sendRecord(T event) { this.sendInternal(Tuple2.of(event, null)); } @@ -88,7 +88,7 @@ void sendRecord(T event) { * @param event to emit * @param eventTime of the event */ - void sendRecord(T event, long eventTime) { + public void sendRecord(T event, long eventTime) { this.sendInternal(Tuple2.of(event, eventTime)); } @@ -97,7 +97,7 @@ void sendRecord(T event, long eventTime) { * * @param timeStamp of the watermark */ - void sendWatermark(long timeStamp) { + public void sendWatermark(long timeStamp) { this.sendInternal(Tuple2.of(null, timeStamp)); } @@ -112,7 +112,7 @@ void markFinished() { * * @return the stream emitted by this source */ - DataStream dataStream() { + public DataStream dataStream() { if (this.stream == null) { this.stream = this.env diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java index 7a523035b7fb..ed66ff3df076 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -38,10 +39,24 @@ public class MetricsReporterFactoryForTests implements MetricReporterFactory { private static final TestMetricsReporter INSTANCE = new TestMetricsReporter(); - private static final Pattern FULL_METRIC_NAME = + private static final Pattern TASK_METRIC_NAME = Pattern.compile( "\\.taskmanager\\.[^.]+\\.[^.]+\\.([^.]+)\\.\\d+\\." + TableMaintenanceMetrics.GROUP_KEY + + "\\." + + TableMaintenanceMetrics.TABLE_NAME_KEY + + "\\.([^.]+)\\." + + TableMaintenanceMetrics.TASK_NAME_KEY + + "\\.([^.]+)\\." + + TableMaintenanceMetrics.TASK_INDEX_KEY + + "\\.([^.]+)\\.([^.]+)"); + + private static final Pattern MAIN_METRIC_NAME = + Pattern.compile( + "\\.taskmanager\\.[^.]+\\.[^.]+\\.([^.]+)\\.\\d+\\." + + TableMaintenanceMetrics.GROUP_KEY + + "\\." + + TableMaintenanceMetrics.TABLE_NAME_KEY + "\\.([^.]+)\\.([^.]+)"); private static Map counters = Maps.newConcurrentMap(); @@ -72,20 +87,26 @@ public static void reset() { gauges = Maps.newConcurrentMap(); } - public static Long counter(String name) { - return counterValues().get(name); + public static Long counter(List parts) { + return counterValues().get(longName(parts)); } - public static Long gauge(String name) { - return gaugeValues().get(name); + public static Long gauge(List parts) { + return gaugeValues().get(longName(parts)); } - public static void assertGauges(Map expected) { - assertThat(filter(gaugeValues(), expected)).isEqualTo(filter(expected, expected)); + public static void assertGauges(Map, Long> expected) { + Map transformed = + expected.entrySet().stream() + .collect(Collectors.toMap(k -> longName(k.getKey()), Map.Entry::getValue)); + assertThat(filter(gaugeValues(), transformed)).isEqualTo(filter(transformed, transformed)); } - public static void assertCounters(Map expected) { - assertThat(filter(counterValues(), expected)).isEqualTo(filter(expected, expected)); + public static void assertCounters(Map, Long> expected) { + Map transformed = + expected.entrySet().stream() + .collect(Collectors.toMap(k -> longName(k.getKey()), Map.Entry::getValue)); + assertThat(filter(counterValues(), transformed)).isEqualTo(filter(transformed, transformed)); } private static Map gaugeValues() { @@ -113,12 +134,30 @@ private static Map filter(Map original, Map parts) { + return parts.stream().map(s -> s.replaceAll("\\.", "_")).collect(Collectors.joining(".")); } private static class TestMetricsReporter implements MetricReporter { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 225853086545..5e05f40e53cf 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -19,27 +19,62 @@ package org.apache.iceberg.flink.maintenance.operator; import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; +import java.io.IOException; +import java.nio.file.Path; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.test.junit5.MiniClusterExtension; -import org.apache.iceberg.flink.FlinkCatalogFactory; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; -class OperatorTestBase { +public class OperatorTestBase { private static final int NUMBER_TASK_MANAGERS = 1; private static final int SLOTS_PER_TASK_MANAGER = 8; - private static final TriggerLockFactory.Lock MAINTENANCE_LOCK = new MemoryLock(); - private static final TriggerLockFactory.Lock RECOVERY_LOCK = new MemoryLock(); + private static final Schema SCHEMA_WITH_PRIMARY_KEY = + new Schema( + Lists.newArrayList( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())), + ImmutableMap.of(), + ImmutableSet.of(SimpleDataUtil.SCHEMA.columns().get(0).fieldId())); - static final String TABLE_NAME = "test_table"; + protected static final String UID_SUFFIX = "UID-Dummy"; + protected static final String SLOT_SHARING_GROUP = "SlotSharingGroup"; + protected static final TriggerLockFactory LOCK_FACTORY = new MemoryLockFactory(); + + public static final String IGNORED_OPERATOR_NAME = "Ignore"; + + static final long EVENT_TIME = 10L; + static final long EVENT_TIME_2 = 11L; + protected static final String DUMMY_TASK_NAME = "dummyTask"; + protected static final String DUMMY_TABLE_NAME = "dummyTable"; @RegisterExtension protected static final MiniClusterExtension MINI_CLUSTER_EXTENSION = @@ -50,43 +85,57 @@ class OperatorTestBase { .setConfiguration(config()) .build()); + @TempDir private Path warehouseDir; + @RegisterExtension - final FlinkSqlExtension sql = - new FlinkSqlExtension( - "catalog", - ImmutableMap.of("type", "iceberg", FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"), - "db"); + private static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); - private static Configuration config() { - Configuration config = new Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG); - MetricOptions.forReporter(config, "test_reporter") - .set(MetricOptions.REPORTER_FACTORY_CLASS, MetricsReporterFactoryForTests.class.getName()); - return config; + @BeforeEach + void before() { + LOCK_FACTORY.open(); + MetricsReporterFactoryForTests.reset(); } - protected static TriggerLockFactory lockFactory() { - return new TriggerLockFactory() { - @Override - public void open() { - MAINTENANCE_LOCK.unlock(); - RECOVERY_LOCK.unlock(); - } + @AfterEach + void after() throws IOException { + LOCK_FACTORY.close(); + } - @Override - public Lock createLock() { - return MAINTENANCE_LOCK; - } + protected static Table createTable() { + return CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + PartitionSpec.unpartitioned(), + null, + ImmutableMap.of("flink.max-continuous-empty-commits", "100000")); + } - @Override - public Lock createRecoveryLock() { - return RECOVERY_LOCK; - } + protected static Table createTableWithDelete() { + return CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SCHEMA_WITH_PRIMARY_KEY, + PartitionSpec.unpartitioned(), + null, + ImmutableMap.of("format-version", "2", "write.upsert.enabled", "true")); + } - @Override - public void close() { - // do nothing - } - }; + protected void insert(Table table, Integer id, String data) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data))); + table.refresh(); + } + + protected void dropTable() { + CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + + protected TableLoader tableLoader() { + return CATALOG_EXTENSION.tableLoader(); } /** @@ -98,7 +147,7 @@ public void close() { * stop without a savepoint. * @return configuration for restarting the job from the savepoint */ - public static Configuration closeJobClient(JobClient jobClient, File savepointDir) { + protected static Configuration closeJobClient(JobClient jobClient, File savepointDir) { Configuration conf = new Configuration(); if (jobClient != null) { if (savepointDir != null) { @@ -126,12 +175,45 @@ public static Configuration closeJobClient(JobClient jobClient, File savepointDi * * @param jobClient the job to close */ - public static void closeJobClient(JobClient jobClient) { + protected static void closeJobClient(JobClient jobClient) { closeJobClient(jobClient, null); } + protected static void checkUidsAreSet(StreamExecutionEnvironment env, String uidSuffix) { + env.getTransformations().stream() + .filter( + t -> !(t instanceof SinkTransformation) && !(t.getName().equals(IGNORED_OPERATOR_NAME))) + .forEach( + transformation -> { + assertThat(transformation.getUid()).isNotNull(); + if (uidSuffix != null) { + assertThat(transformation.getUid()).contains(UID_SUFFIX); + } + }); + } + + protected static void checkSlotSharingGroupsAreSet(StreamExecutionEnvironment env, String name) { + String nameToCheck = name != null ? name : StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + + env.getTransformations().stream() + .filter( + t -> !(t instanceof SinkTransformation) && !(t.getName().equals(IGNORED_OPERATOR_NAME))) + .forEach( + t -> { + assertThat(t.getSlotSharingGroup()).isPresent(); + assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(nameToCheck); + }); + } + + private static Configuration config() { + Configuration config = new Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG); + MetricOptions.forReporter(config, "test_reporter") + .set(MetricOptions.REPORTER_FACTORY_CLASS, MetricsReporterFactoryForTests.class.getName()); + return config; + } + private static class MemoryLock implements TriggerLockFactory.Lock { - boolean locked = false; + volatile boolean locked = false; @Override public boolean tryLock() { @@ -153,4 +235,30 @@ public void unlock() { locked = false; } } + + private static class MemoryLockFactory implements TriggerLockFactory { + private static final TriggerLockFactory.Lock MAINTENANCE_LOCK = new MemoryLock(); + private static final TriggerLockFactory.Lock RECOVERY_LOCK = new MemoryLock(); + + @Override + public void open() { + MAINTENANCE_LOCK.unlock(); + RECOVERY_LOCK.unlock(); + } + + @Override + public Lock createLock() { + return MAINTENANCE_LOCK; + } + + @Override + public Lock createRecoveryLock() { + return RECOVERY_LOCK; + } + + @Override + public void close() { + // do nothing + } + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java new file mode 100644 index 000000000000..d70c4aafd59a --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestDeleteFilesProcessor extends OperatorTestBase { + private static final String DUMMY_FILE_NAME = "dummy"; + private static final Set TABLE_FILES = + ImmutableSet.of( + "metadata/v1.metadata.json", + "metadata/version-hint.text", + "metadata/.version-hint.text.crc", + "metadata/.v1.metadata.json.crc"); + + private Table table; + + @BeforeEach + void before() { + this.table = createTable(); + } + + @Test + void testDelete() throws Exception { + // Write an extra file + Path dummyFile = Path.of(tablePath(table).toString(), DUMMY_FILE_NAME); + Files.write(dummyFile, "DUMMY".getBytes(StandardCharsets.UTF_8)); + + Set files = listFiles(table); + assertThat(files) + .containsAll(TABLE_FILES) + .contains(DUMMY_FILE_NAME) + .hasSize(TABLE_FILES.size() + 1); + + deleteFile(tableLoader(), dummyFile.toString()); + + assertThat(listFiles(table)).isEqualTo(TABLE_FILES); + } + + @Test + void testDeleteMissingFile() throws Exception { + Path dummyFile = + FileSystems.getDefault().getPath(table.location().substring(5), DUMMY_FILE_NAME); + + deleteFile(tableLoader(), dummyFile.toString()); + + assertThat(listFiles(table)).isEqualTo(TABLE_FILES); + } + + @Test + void testInvalidURIScheme() throws Exception { + deleteFile(tableLoader(), "wrong://"); + + assertThat(listFiles(table)).isEqualTo(TABLE_FILES); + } + + private void deleteFile(TableLoader tableLoader, String fileName) throws Exception { + tableLoader().open(); + try (OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>( + new DeleteFilesProcessor(0, DUMMY_TASK_NAME, tableLoader.loadTable(), 10), + StringSerializer.INSTANCE)) { + testHarness.open(); + testHarness.processElement(fileName, System.currentTimeMillis()); + testHarness.processWatermark(EVENT_TIME); + testHarness.endInput(); + } + } + + private static Path tablePath(Table table) { + return FileSystems.getDefault().getPath(table.location().substring(5)); + } + + private static Set listFiles(Table table) throws IOException { + String tableRootPath = TestFixtures.TABLE_IDENTIFIER.toString().replace(".", "/"); + return Files.find( + tablePath(table), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) + .map( + p -> + p.toString() + .substring(p.toString().indexOf(tableRootPath) + tableRootPath.length() + 1)) + .collect(Collectors.toSet()); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java new file mode 100644 index 000000000000..d312fc312c99 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Queue; +import java.util.Set; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class TestExpireSnapshotsProcessor extends OperatorTestBase { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExpire(boolean success) throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + + List actual; + Queue> deletes; + try (OneInputStreamOperatorTestHarness testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10))) { + testHarness.open(); + + if (!success) { + // Cause an exception + dropTable(); + } + + testHarness.processElement(Trigger.create(10, 11), System.currentTimeMillis()); + deletes = testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM); + actual = testHarness.extractOutputValues(); + } + + assertThat(actual).hasSize(1); + TaskResult result = actual.get(0); + assertThat(result.startEpoch()).isEqualTo(10); + assertThat(result.taskIndex()).isEqualTo(11); + assertThat(result.success()).isEqualTo(success); + + if (success) { + assertThat(result.exceptions()).isNotNull().isEmpty(); + + table.refresh(); + Set snapshots = Sets.newHashSet(table.snapshots()); + assertThat(snapshots).hasSize(1); + assertThat(deletes).hasSize(1); + } else { + assertThat(result.exceptions()).isNotNull().hasSize(1); + assertThat(deletes).isNull(); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java index ccb90ec33d9c..f75c765df967 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.flink.maintenance.operator; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.LAST_RUN_DURATION_MS; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER; import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.time.Duration; import java.util.Collection; +import java.util.List; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; @@ -44,6 +45,9 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.awaitility.Awaitility; @@ -54,7 +58,7 @@ @Timeout(value = 10) class TestLockRemover extends OperatorTestBase { - private static final String[] TASKS = new String[] {"task0", "task1"}; + private static final String[] TASKS = new String[] {"task0", "task1", "task2"}; private static final TriggerLockFactory.Lock LOCK = new TestingLock(); private static final TriggerLockFactory.Lock RECOVERY_LOCK = new TestingLock(); @@ -72,9 +76,9 @@ void testProcess() throws Exception { source .dataStream() .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS))) + new LockRemover(DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS))) .setParallelism(1); JobClient jobClient = null; @@ -129,37 +133,79 @@ void testMetrics() throws Exception { source .dataStream() .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS))) + new LockRemover(DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS))) .setParallelism(1); JobClient jobClient = null; + long time = System.currentTimeMillis(); try { jobClient = env.executeAsync(); // Start the 2 successful and one failed result trigger for task1, and 3 successful for task2 - processAndCheck(source, new TaskResult(0, 0L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(1, 1L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(1, 2L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(0, 3L, false, Lists.newArrayList())); - processAndCheck(source, new TaskResult(0, 4L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(1, 5L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, false, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); Awaitility.await() .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER) + ImmutableList.of( + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[1], + "1", + SUCCEEDED_TASK_COUNTER)) .equals(3L)); // Final check all the counters MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() - .put(DUMMY_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER, 2L) - .put(DUMMY_NAME + "." + TASKS[0] + "." + FAILED_TASK_COUNTER, 1L) - .put(DUMMY_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER, 3L) - .put(DUMMY_NAME + "." + TASKS[1] + "." + FAILED_TASK_COUNTER, 0L) + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", SUCCEEDED_TASK_COUNTER), + 2L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", FAILED_TASK_COUNTER), + 1L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", SUCCEEDED_TASK_COUNTER), + 3L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", FAILED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", SUCCEEDED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", FAILED_TASK_COUNTER), + 0L) .build()); + + assertThat( + MetricsReporterFactoryForTests.gauge( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", LAST_RUN_DURATION_MS))) + .isPositive(); + assertThat( + MetricsReporterFactoryForTests.gauge( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", LAST_RUN_DURATION_MS))) + .isGreaterThan(time); + assertThat( + MetricsReporterFactoryForTests.gauge( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", LAST_RUN_DURATION_MS))) + .isZero(); } finally { closeJobClient(jobClient); } @@ -182,9 +228,10 @@ void testRecovery() throws Exception { .dataStream() .union(source2.dataStream()) .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS[0]))) + new LockRemover( + DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS[0]))) .setParallelism(1); JobClient jobClient = null; @@ -202,7 +249,12 @@ void testRecovery() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER) + ImmutableList.of( + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[0], + "0", + SUCCEEDED_TASK_COUNTER)) .equals(2L)); // We did not remove the recovery lock, as no watermark received from the other source @@ -224,20 +276,21 @@ private void processAndCheck(ManualSource source, TaskResult input) private void processAndCheck( ManualSource source, TaskResult input, String counterPrefix) { + List counterKey = + ImmutableList.of( + (counterPrefix != null ? counterPrefix : "") + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[input.taskIndex()], + String.valueOf(input.taskIndex()), + input.success() ? SUCCEEDED_TASK_COUNTER : FAILED_TASK_COUNTER); + Long counterValue = MetricsReporterFactoryForTests.counter(counterKey); + Long expected = counterValue != null ? counterValue + 1 : 1L; + source.sendRecord(input); source.sendWatermark(input.startEpoch()); - String counterName = - (counterPrefix != null ? counterPrefix : "") - .concat( - input.success() - ? DUMMY_NAME + "." + TASKS[input.taskIndex()] + "." + SUCCEEDED_TASK_COUNTER - : DUMMY_NAME + "." + TASKS[input.taskIndex()] + "." + FAILED_TASK_COUNTER); - Long counterValue = MetricsReporterFactoryForTests.counter(counterName); - Long expected = counterValue != null ? counterValue + 1 : 1L; - Awaitility.await() - .until(() -> expected.equals(MetricsReporterFactoryForTests.counter(counterName))); + .until(() -> expected.equals(MetricsReporterFactoryForTests.counter(counterKey))); } private static class TestingLockFactory implements TriggerLockFactory { @@ -371,9 +424,10 @@ public void flatMap( } }) .transform( - DUMMY_NAME, + DUMMY_TASK_NAME, TypeInformation.of(Void.class), - new LockRemover(new TestingLockFactory(), Lists.newArrayList(TASKS[0]))); + new LockRemover( + DUMMY_TABLE_NAME, new TestingLockFactory(), Lists.newArrayList(TASKS[0]))); } } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java index 3aee05322561..c561c7054eae 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -60,40 +61,27 @@ class TestMonitorSource extends OperatorTestBase { @ParameterizedTest @ValueSource(booleans = {true, false}) - void testChangeReaderIterator(boolean withDelete) { - if (withDelete) { - sql.exec( - "CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT ENFORCED) WITH ('format-version'='2', 'write.upsert.enabled'='true')", - TABLE_NAME); - } else { - sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); - } - - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - tableLoader.open(); - Table table = tableLoader.loadTable(); + void testChangeReaderIterator(boolean withDelete) throws IOException { + Table table = withDelete ? createTableWithDelete() : createTable(); MonitorSource.TableChangeIterator iterator = - new MonitorSource.TableChangeIterator(tableLoader, null, Long.MAX_VALUE); + new MonitorSource.TableChangeIterator(tableLoader(), null, Long.MAX_VALUE); // For an empty table we get an empty result assertThat(iterator.next()).isEqualTo(EMPTY_EVENT); // Add a single commit and get back the commit data in the event - sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); - table.refresh(); + insert(table, 1, "a"); TableChange expected = tableChangeWithLastSnapshot(table, TableChange.empty()); assertThat(iterator.next()).isEqualTo(expected); // Make sure that consecutive calls do not return the data again assertThat(iterator.next()).isEqualTo(EMPTY_EVENT); // Add two more commits, but fetch the data in one loop - sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME); - table.refresh(); + insert(table, 2, "b"); expected = tableChangeWithLastSnapshot(table, TableChange.empty()); - sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME); - table.refresh(); + insert(table, 3, "c"); expected = tableChangeWithLastSnapshot(table, expected); assertThat(iterator.next()).isEqualTo(expected); @@ -106,17 +94,11 @@ void testChangeReaderIterator(boolean withDelete) { */ @Test void testSource() throws Exception { - sql.exec( - "CREATE TABLE %s (id int, data varchar) " - + "WITH ('flink.max-continuous-empty-commits'='100000')", - TABLE_NAME); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - tableLoader.open(); - Table table = tableLoader.loadTable(); + Table table = createTable(); DataStream events = env.fromSource( - new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE), + new MonitorSource(tableLoader(), HIGH_RATE, Long.MAX_VALUE), WatermarkStrategy.noWatermarks(), "TableChangeSource") .forceNonParallel(); @@ -176,8 +158,9 @@ void testSource() throws Exception { /** Check that the {@link MonitorSource} operator state is restored correctly. */ @Test void testStateRestore(@TempDir File savepointDir) throws Exception { - sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); - sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + Table table = createTable(); + insert(table, 1, "a"); + TableLoader tableLoader = tableLoader(); Configuration config = new Configuration(); config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); @@ -185,8 +168,6 @@ void testStateRestore(@TempDir File savepointDir) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.enableCheckpointing(1000); - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - tableLoader.open(); DataStream events = env.fromSource( new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE), @@ -268,14 +249,12 @@ void testStateRestore(@TempDir File savepointDir) throws Exception { @Test void testNotOneParallelismThrows() { - sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + createTable(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - tableLoader.open(); env.fromSource( - new MonitorSource(tableLoader, HIGH_RATE, Long.MAX_VALUE), + new MonitorSource(tableLoader(), HIGH_RATE, Long.MAX_VALUE), WatermarkStrategy.noWatermarks(), "TableChangeSource") .setParallelism(2) @@ -289,14 +268,13 @@ void testNotOneParallelismThrows() { } @Test - void testMaxReadBack() { - sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); - sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); - sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME); - sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME); + void testMaxReadBack() throws IOException { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - tableLoader.open(); + TableLoader tableLoader = tableLoader(); MonitorSource.TableChangeIterator iterator = new MonitorSource.TableChangeIterator(tableLoader, null, 1); @@ -316,12 +294,11 @@ void testMaxReadBack() { } @Test - void testSkipReplace() { - sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); - sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + void testSkipReplace() throws IOException { + Table table = createTable(); + insert(table, 1, "a"); - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - tableLoader.open(); + TableLoader tableLoader = tableLoader(); MonitorSource.TableChangeIterator iterator = new MonitorSource.TableChangeIterator(tableLoader, null, Long.MAX_VALUE); @@ -330,7 +307,6 @@ void testSkipReplace() { assertThat(iterator.next().commitCount()).isEqualTo(1); // Create a DataOperations.REPLACE snapshot - Table table = tableLoader.loadTable(); DataFile dataFile = table.snapshots().iterator().next().addedDataFiles(table.io()).iterator().next(); RewriteFiles rewrite = tableLoader.loadTable().newRewrite(); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java index fba4a12d9c6b..a70d27279460 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java @@ -18,19 +18,13 @@ */ package org.apache.iceberg.flink.maintenance.operator; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; -import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; import java.time.Duration; -import java.util.Iterator; import java.util.List; import java.util.stream.Stream; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -41,13 +35,14 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.iceberg.SerializableTable; -import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -56,34 +51,24 @@ class TestTriggerManager extends OperatorTestBase { private static final long DELAY = 10L; - private static final String NAME_1 = "name1"; - private static final String NAME_2 = "name2"; + private static final String[] TASKS = new String[] {"task0", "task1"}; private long processingTime = 0L; - private TriggerLockFactory lockFactory; private TriggerLockFactory.Lock lock; private TriggerLockFactory.Lock recoveringLock; + private String tableName; @BeforeEach void before() { - sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); - this.lockFactory = lockFactory(); - lockFactory.open(); - this.lock = lockFactory.createLock(); - this.recoveringLock = lockFactory.createRecoveryLock(); - lock.unlock(); - recoveringLock.unlock(); - MetricsReporterFactoryForTests.reset(); - } - - @AfterEach - void after() throws IOException { - lockFactory.close(); + Table table = createTable(); + this.lock = LOCK_FACTORY.createLock(); + this.recoveringLock = LOCK_FACTORY.createRecoveryLock(); + this.tableName = table.name(); } @Test void testCommitCount() throws Exception { TriggerManager manager = - manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().commitCount(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().commitCount(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -104,8 +89,7 @@ void testCommitCount() throws Exception { @Test void testDataFileCount() throws Exception { TriggerManager manager = - manager( - sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().dataFileCount(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().dataFileCount(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -126,9 +110,7 @@ void testDataFileCount() throws Exception { @Test void testDataFileSizeInBytes() throws Exception { TriggerManager manager = - manager( - sql.tableLoader(TABLE_NAME), - new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -147,9 +129,7 @@ void testDataFileSizeInBytes() throws Exception { @Test void testPosDeleteFileCount() throws Exception { TriggerManager manager = - manager( - sql.tableLoader(TABLE_NAME), - new TriggerEvaluator.Builder().posDeleteFileCount(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().posDeleteFileCount(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -170,9 +150,7 @@ void testPosDeleteFileCount() throws Exception { @Test void testPosDeleteRecordCount() throws Exception { TriggerManager manager = - manager( - sql.tableLoader(TABLE_NAME), - new TriggerEvaluator.Builder().posDeleteRecordCount(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().posDeleteRecordCount(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -196,9 +174,7 @@ void testPosDeleteRecordCount() throws Exception { @Test void testEqDeleteFileCount() throws Exception { TriggerManager manager = - manager( - sql.tableLoader(TABLE_NAME), - new TriggerEvaluator.Builder().eqDeleteFileCount(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().eqDeleteFileCount(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -219,9 +195,7 @@ void testEqDeleteFileCount() throws Exception { @Test void testEqDeleteRecordCount() throws Exception { TriggerManager manager = - manager( - sql.tableLoader(TABLE_NAME), - new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build()); + manager(tableLoader(), new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -241,8 +215,7 @@ void testEqDeleteRecordCount() throws Exception { void testTimeout() throws Exception { TriggerManager manager = manager( - sql.tableLoader(TABLE_NAME), - new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build()); + tableLoader(), new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build()); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { testHarness.open(); @@ -281,7 +254,7 @@ void testTimeout() throws Exception { @Test void testStateRestore() throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); TriggerManager manager = manager(tableLoader); OperatorSubtaskState state; try (KeyedOneInputStreamOperatorTestHarness testHarness = @@ -319,7 +292,7 @@ void testStateRestore() throws Exception { @Test void testMinFireDelay() throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); TriggerManager manager = manager(tableLoader, DELAY, 1); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { @@ -339,7 +312,7 @@ void testMinFireDelay() throws Exception { @Test void testLockCheckDelay() throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); TriggerManager manager = manager(tableLoader, 1, DELAY); try (KeyedOneInputStreamOperatorTestHarness testHarness = harness(manager)) { @@ -372,7 +345,7 @@ void testLockCheckDelay() throws Exception { @ParameterizedTest @MethodSource("parametersForTestRecovery") void testRecovery(boolean locked, boolean runningTask) throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); TriggerManager manager = manager(tableLoader); OperatorSubtaskState state; try (KeyedOneInputStreamOperatorTestHarness testHarness = @@ -423,18 +396,14 @@ void testRecovery(boolean locked, boolean runningTask) throws Exception { ++processingTime; testHarness.setProcessingTime(processingTime); // Releasing lock will create a new snapshot, and we receive this in the trigger - expected.add( - Trigger.create( - processingTime, - (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()), - 0)); + expected.add(Trigger.create(processingTime, 0)); assertTriggers(testHarness.extractOutputValues(), expected); } } @Test void testTriggerMetrics() throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ManualSource source = @@ -444,8 +413,8 @@ void testTriggerMetrics() throws Exception { TriggerManager manager = new TriggerManager( tableLoader, - lockFactory, - Lists.newArrayList(NAME_1, NAME_2), + LOCK_FACTORY, + Lists.newArrayList(TASKS), Lists.newArrayList( new TriggerEvaluator.Builder().commitCount(2).build(), new TriggerEvaluator.Builder().commitCount(4).build()), @@ -455,7 +424,7 @@ void testTriggerMetrics() throws Exception { .dataStream() .keyBy(unused -> true) .process(manager) - .name(DUMMY_NAME) + .name(DUMMY_TASK_NAME) .forceNonParallel() .sinkTo(sink); @@ -471,7 +440,7 @@ void testTriggerMetrics() throws Exception { () -> { Long notingCounter = MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER); + ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER)); return notingCounter != null && notingCounter.equals(1L); }); @@ -480,7 +449,8 @@ void testTriggerMetrics() throws Exception { // Wait until we receive the trigger assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED))) .isEqualTo(1L); lock.unlock(); @@ -492,20 +462,22 @@ void testTriggerMetrics() throws Exception { assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); lock.unlock(); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED))) .isEqualTo(2L); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", TRIGGERED))) .isEqualTo(1L); // Final check all the counters MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, -1L) - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED, -1L) - .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 2L) - .put(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED, 1L) - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 1L) + new ImmutableMap.Builder, Long>() + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED), -1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED), -1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED), 2L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", TRIGGERED), 1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER), 1L) .build()); } finally { closeJobClient(jobClient); @@ -514,7 +486,7 @@ void testTriggerMetrics() throws Exception { @Test void testRateLimiterMetrics() throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ManualSource source = @@ -527,7 +499,7 @@ void testRateLimiterMetrics() throws Exception { .dataStream() .keyBy(unused -> true) .process(manager) - .name(DUMMY_NAME) + .name(DUMMY_TASK_NAME) .forceNonParallel() .sinkTo(sink); @@ -548,7 +520,7 @@ void testRateLimiterMetrics() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED) + ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED)) .equals(1L)); // Final check all the counters @@ -560,7 +532,7 @@ void testRateLimiterMetrics() throws Exception { @Test void testConcurrentRunMetrics() throws Exception { - TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TableLoader tableLoader = tableLoader(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ManualSource source = @@ -573,7 +545,7 @@ void testConcurrentRunMetrics() throws Exception { .dataStream() .keyBy(unused -> true) .process(manager) - .name(DUMMY_NAME) + .name(DUMMY_TASK_NAME) .forceNonParallel() .sinkTo(sink); @@ -591,7 +563,7 @@ void testConcurrentRunMetrics() throws Exception { .until( () -> MetricsReporterFactoryForTests.counter( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED) + ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED)) .equals(1L)); // Final check all the counters @@ -611,15 +583,15 @@ private static Stream parametersForTestRecovery() { private void assertCounters(long rateLimiterTrigger, long concurrentRunTrigger) { MetricsReporterFactoryForTests.assertCounters( - new ImmutableMap.Builder() + new ImmutableMap.Builder, Long>() .put( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, + ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED), rateLimiterTrigger) .put( - DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED, + ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED), concurrentRunTrigger) - .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 1L) - .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 0L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED), 1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER), 0L) .build()); } @@ -644,15 +616,20 @@ private void addEventAndCheckResult( private TriggerManager manager(TableLoader tableLoader, TriggerEvaluator evaluator) { return new TriggerManager( - tableLoader, lockFactory, Lists.newArrayList(NAME_1), Lists.newArrayList(evaluator), 1, 1); + tableLoader, + LOCK_FACTORY, + Lists.newArrayList(TASKS[0]), + Lists.newArrayList(evaluator), + 1, + 1); } private TriggerManager manager( TableLoader tableLoader, long minFireDelayMs, long lockCheckDelayMs) { return new TriggerManager( tableLoader, - lockFactory, - Lists.newArrayList(NAME_1), + LOCK_FACTORY, + Lists.newArrayList(TASKS[0]), Lists.newArrayList(new TriggerEvaluator.Builder().commitCount(2).build()), minFireDelayMs, lockCheckDelayMs); @@ -670,17 +647,6 @@ private static void assertTriggers(List expected, List actual) assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp()); assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId()); assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery()); - if (expectedTrigger.table() == null) { - assertThat(actualTrigger.table()).isNull(); - } else { - Iterator expectedSnapshots = expectedTrigger.table().snapshots().iterator(); - Iterator actualSnapshots = actualTrigger.table().snapshots().iterator(); - while (expectedSnapshots.hasNext()) { - assertThat(actualSnapshots.hasNext()).isTrue(); - assertThat(expectedSnapshots.next().snapshotId()) - .isEqualTo(actualSnapshots.next().snapshotId()); - } - } } } }