diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java new file mode 100644 index 000000000000..af40919a8f96 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Delete the files using the {@link FileIO}. */ +@Internal +public class AsyncDeleteFiles extends RichAsyncFunction { + private static final Logger LOG = LoggerFactory.getLogger(AsyncDeleteFiles.class); + public static final Predicate> FAILED_PREDICATE = new FailedPredicate(); + + private final String name; + private final FileIO io; + private final int workerPoolSize; + private final String tableName; + + private transient ExecutorService workerPool; + private transient Counter failedCounter; + private transient Counter succeededCounter; + + public AsyncDeleteFiles(String name, TableLoader tableLoader, int workerPoolSize) { + Preconditions.checkNotNull(name, "Name should no be null"); + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.name = name; + tableLoader.open(); + Table table = tableLoader.loadTable(); + this.io = table.io(); + this.workerPoolSize = workerPoolSize; + this.tableName = table.name(); + } + + @Override + public void open(Configuration parameters) throws Exception { + this.failedCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); + this.succeededCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); + + this.workerPool = + ThreadPools.newWorkerPool(tableName + "-" + name + "-async-delete-files", workerPoolSize); + } + + @Override + public void asyncInvoke(String fileName, ResultFuture resultFuture) { + workerPool.execute( + () -> { + try { + LOG.info("Deleting file: {} with {}", fileName, name); + io.deleteFile(fileName); + resultFuture.complete(Collections.singletonList(true)); + succeededCounter.inc(); + } catch (Throwable e) { + LOG.info("Failed to delete file {} with {}", fileName, name, e); + resultFuture.complete(Collections.singletonList(false)); + failedCounter.inc(); + } + }); + } + + private static class FailedPredicate implements Predicate>, Serializable { + @Override + public boolean test(Collection collection) { + return collection.size() != 1 || !collection.iterator().next(); + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java new file mode 100644 index 000000000000..bbd6e9775557 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could + * be removed in the {@link #DELETE_STREAM} side output. + */ +public class ExpireSnapshotsProcessor extends ProcessFunction { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class); + public static final OutputTag DELETE_STREAM = + new OutputTag<>("delete-stream", Types.STRING); + + private final TableLoader tableLoader; + private final Long minAgeMs; + private final Integer retainLast; + private final int plannerPoolSize; + private transient ExecutorService plannerPool; + private transient Table table; + + public ExpireSnapshotsProcessor( + TableLoader tableLoader, Long minAgeMs, Integer retainLast, int plannerPoolSize) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.tableLoader = tableLoader; + this.minAgeMs = minAgeMs; + this.retainLast = retainLast; + this.plannerPoolSize = plannerPoolSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize); + } + + @Override + public void processElement(Trigger trigger, Context ctx, Collector out) + throws Exception { + try { + table.refresh(); + ExpireSnapshots expireSnapshots = table.expireSnapshots(); + if (minAgeMs != null) { + expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - minAgeMs); + } + + if (retainLast != null) { + expireSnapshots = expireSnapshots.retainLast(retainLast); + } + + expireSnapshots + .planWith(plannerPool) + .deleteWith(file -> ctx.output(DELETE_STREAM, file)) + .cleanExpiredFiles(true) + .commit(); + + LOG.info("Successfully finished expiring snapshots for {} at {}", table, ctx.timestamp()); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList())); + } catch (Exception e) { + LOG.info("Exception expiring snapshots for {} at {}", table, ctx.timestamp(), e); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e))); + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java index 89efffa15f16..d74b2349b1de 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java @@ -43,7 +43,7 @@ /** Monitors an Iceberg table for changes */ @Internal -class MonitorSource extends SingleThreadedIteratorSource { +public class MonitorSource extends SingleThreadedIteratorSource { private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class); private final TableLoader tableLoader; @@ -58,7 +58,7 @@ class MonitorSource extends SingleThreadedIteratorSource { * @param rateLimiterStrategy limits the frequency the table is checked * @param maxReadBack sets the number of snapshots read before stopping change collection */ - MonitorSource( + public MonitorSource( TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) { Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null"); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java index 773b34b6c495..25c3c1028113 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java @@ -28,7 +28,7 @@ /** Event describing changes in an Iceberg table */ @Internal -class TableChange { +public class TableChange { private int dataFileCount; private long dataFileSizeInBytes; private int posDeleteFileCount; @@ -87,7 +87,7 @@ static TableChange empty() { return new TableChange(0, 0L, 0, 0L, 0, 0L, 0); } - static Builder builder() { + public static Builder builder() { return new Builder(); } @@ -115,7 +115,7 @@ long eqDeleteRecordCount() { return eqDeleteRecordCount; } - public int commitCount() { + int commitCount() { return commitCount; } @@ -183,7 +183,7 @@ public int hashCode() { commitCount); } - static class Builder { + public static class Builder { private int dataFileCount = 0; private long dataFileSizeInBytes = 0L; private int posDeleteFileCount = 0; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java index 1a04461aed43..c57ed5092504 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceMetrics.java @@ -33,6 +33,10 @@ public class TableMaintenanceMetrics { public static final String FAILED_TASK_COUNTER = "failedTasks"; public static final String LAST_RUN_DURATION_MS = "lastRunDurationMs"; + // DeleteFiles metrics + public static final String DELETE_FILE_FAILED_COUNTER = "deleteFailed"; + public static final String DELETE_FILE_SUCCEEDED_COUNTER = "deleteSucceeded"; + private TableMaintenanceMetrics() { // do not instantiate } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java index 85c6c8dbdd55..b4556dc1beab 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java @@ -23,7 +23,7 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @Internal -class Trigger { +public class Trigger { private final long timestamp; private final SerializableTable table; private final Integer taskId; @@ -36,7 +36,7 @@ private Trigger(long timestamp, SerializableTable table, Integer taskId, boolean this.isRecovery = isRecovery; } - static Trigger create(long timestamp, SerializableTable table, int taskId) { + public static Trigger create(long timestamp, SerializableTable table, int taskId) { return new Trigger(timestamp, table, taskId, false); } @@ -44,7 +44,7 @@ static Trigger recovery(long timestamp) { return new Trigger(timestamp, null, null, true); } - long timestamp() { + public long timestamp() { return timestamp; } @@ -52,7 +52,7 @@ SerializableTable table() { return table; } - Integer taskId() { + public Integer taskId() { return taskId; } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java index dba33b22a42a..d448898bdfe6 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerEvaluator.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; @Internal -class TriggerEvaluator implements Serializable { +public class TriggerEvaluator implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(TriggerEvaluator.class); private final List predicates; @@ -50,7 +50,7 @@ boolean check(TableChange event, long lastTimeMs, long currentTimeMs) { return result; } - static class Builder implements Serializable { + public static class Builder implements Serializable { private Integer dataFileCount; private Long dataFileSizeInBytes; private Integer posDeleteFileCount; @@ -95,12 +95,12 @@ public Builder commitCount(int newCommitCount) { return this; } - Builder timeout(Duration newTimeout) { + public Builder timeout(Duration newTimeout) { this.timeout = newTimeout; return this; } - TriggerEvaluator build() { + public TriggerEvaluator build() { List predicates = Lists.newArrayList(); if (dataFileCount != null) { predicates.add((change, unused, unused2) -> change.dataFileCount() >= dataFileCount); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java index 329223d27ccf..446e8ce2f2a8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerLockFactory.java @@ -51,12 +51,11 @@ interface Lock { */ boolean isHeld(); - // TODO: Fix the link to the LockRemover when we have a final name and implementation /** * Releases the lock. Should not fail if the lock is not held by anyone. * - *

Called by LockRemover. Implementations could assume that are no concurrent calls for this - * method. + *

Called by {@link LockRemover}. Implementations could assume that are no concurrent calls + * for this method. */ void unlock(); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java index dc95b27af0a6..6b658d122734 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java @@ -57,7 +57,7 @@ * the timer functions are available, but the key is not used. */ @Internal -class TriggerManager extends KeyedProcessFunction +public class TriggerManager extends KeyedProcessFunction implements CheckpointedFunction { private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); @@ -89,7 +89,7 @@ class TriggerManager extends KeyedProcessFunction private transient int startsFrom = 0; private transient boolean triggered = false; - TriggerManager( + public TriggerManager( TableLoader tableLoader, TriggerLockFactory lockFactory, List maintenanceTaskNames, diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java new file mode 100644 index 000000000000..056d8e7d0b7f --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT = 10; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final int DELETE_WORKER_POOL_SIZE_DEFAULT = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder { + private Duration minAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT; + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = DELETE_WORKER_POOL_SIZE_DEFAULT; + + /** + * The snapshots newer than this age will not be removed. + * + * @param newMinAge of the files to be removed + * @return for chained calls + */ + public Builder minAge(Duration newMinAge) { + this.minAge = newMinAge; + return this; + } + + /** + * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more details description see + * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}. + * + * @param newRetainLast number of snapshots to retain + * @return for chained calls + */ + public Builder retainLast(int newRetainLast) { + this.retainLast = newRetainLast; + return this; + } + + /** + * The worker pool size used to calculate the files to delete. + * + * @param newPlanningWorkerPoolSize for planning files to delete + * @return for chained calls + */ + public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { + this.planningWorkerPoolSize = newPlanningWorkerPoolSize; + return this; + } + + /** + * The number of retries on the failed delete attempts. + * + * @param newDeleteAttemptNum number of retries + * @return for chained calls + */ + public Builder deleteAttemptNum(int newDeleteAttemptNum) { + this.deleteAttemptNum = newDeleteAttemptNum; + return this; + } + + /** + * The worker pool size used for deleting files. + * + * @param newDeleteWorkerPoolSize for scanning + * @return for chained calls + */ + public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) { + this.deleteWorkerPoolSize = newDeleteWorkerPoolSize; + return this; + } + + @Override + DataStream buildInternal(DataStream trigger) { + Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); + + SingleOutputStreamOperator result = + trigger + .process( + new ExpireSnapshotsProcessor( + tableLoader(), + minAge == null ? null : minAge.toMillis(), + retainLast, + planningWorkerPoolSize)) + .name(EXECUTOR_TASK_NAME) + .uid(uidPrefix() + "-expire-snapshots") + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + AsyncRetryStrategy retryStrategy = + new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder( + deleteAttemptNum, + DELETE_INITIAL_DELAY_MS, + DELETE_MAX_RETRY_DELAY_MS, + DELETE_BACKOFF_MULTIPLIER) + .ifResult(AsyncDeleteFiles.FAILED_PREDICATE) + .build(); + + AsyncDataStream.unorderedWaitWithRetry( + result.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM).rebalance(), + new AsyncDeleteFiles(name(), tableLoader(), deleteWorkerPoolSize), + DELETE_TIMEOUT_MS, + TimeUnit.MILLISECONDS, + deleteWorkerPoolSize, + retryStrategy) + .name(DELETE_FILES_TASK_NAME) + .uid(uidPrefix() + "-delete-expired-files") + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + // Deleting the files is asynchronous, so we ignore the results when calculating the return + // value + return result; + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java new file mode 100644 index 000000000000..d02ba7662b38 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public abstract class MaintenanceTaskBuilder { + private int id; + private String name; + private TableLoader tableLoader; + private String uidPrefix = null; + private String slotSharingGroup = null; + private Integer parallelism = null; + private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + + abstract DataStream buildInternal(DataStream sourceStream); + + /** + * After a given number of Iceberg table commits since the last run, starts the downstream job. + * + * @param commitCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnCommitCount(int commitCount) { + triggerEvaluator.commitCount(commitCount); + return (T) this; + } + + /** + * After a given number of new data files since the last run, starts the downstream job. + * + * @param dataFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnDataFileCount(int dataFileCount) { + triggerEvaluator.dataFileCount(dataFileCount); + return (T) this; + } + + /** + * After a given aggregated data file size since the last run, starts the downstream job. + * + * @param dataFileSizeInBytes after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnDataFileSize(long dataFileSizeInBytes) { + triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes); + return (T) this; + } + + /** + * After a given number of new positional delete files since the last run, starts the downstream + * job. + * + * @param posDeleteFileCount after the downstream job should be started + * @return for chained calls + */ + public T schedulerOnPosDeleteFileCount(int posDeleteFileCount) { + triggerEvaluator.posDeleteFileCount(posDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new positional delete records since the last run, starts the downstream + * job. + * + * @param posDeleteRecordCount after the downstream job should be started + * @return for chained calls + */ + public T schedulerOnPosDeleteRecordCount(long posDeleteRecordCount) { + triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); + return (T) this; + } + + /** + * After a given number of new equality delete files since the last run, starts the downstream + * job. + * + * @param eqDeleteFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) { + triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new equality delete records since the last run, starts the downstream + * job. + * + * @param eqDeleteRecordCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { + triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount); + return (T) this; + } + + /** + * After a given time since the last run, starts the downstream job. + * + * @param time after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnTime(Duration time) { + triggerEvaluator.timeout(time); + return (T) this; + } + + /** + * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidPrefix for the transformations + * @return for chained calls + */ + public T uidPrefix(String newUidPrefix) { + this.uidPrefix = newUidPrefix; + return (T) this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public T slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return (T) this; + } + + /** + * Sets the parallelism for the stream. + * + * @param newParallelism the required parallelism + * @return for chained calls + */ + public T parallelism(int newParallelism) { + this.parallelism = newParallelism; + return (T) this; + } + + @Internal + int id() { + return id; + } + + @Internal + String name() { + return name; + } + + @Internal + TableLoader tableLoader() { + return tableLoader; + } + + @Internal + String uidPrefix() { + return uidPrefix; + } + + @Internal + String slotSharingGroup() { + return slotSharingGroup; + } + + @Internal + Integer parallelism() { + return parallelism; + } + + @Internal + TriggerEvaluator evaluator() { + return triggerEvaluator.build(); + } + + @Internal + DataStream build( + DataStream sourceStream, + int newId, + String newName, + TableLoader newTableLoader, + String mainUidPrefix, + String mainSlotSharingGroup, + int mainParallelism) { + Preconditions.checkArgument( + parallelism == null || parallelism == -1 || parallelism > 0, + "Parallelism should be left to default (-1/null) or greater than 0"); + Preconditions.checkNotNull(newName, "Name should not be null"); + Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); + + this.id = newId; + this.name = newName; + this.tableLoader = newTableLoader; + + if (uidPrefix == null) { + uidPrefix = mainUidPrefix + "_" + name + "_" + id; + } + + if (parallelism == null) { + parallelism = mainParallelism; + } + + if (slotSharingGroup == null) { + slotSharingGroup = mainSlotSharingGroup; + } + + tableLoader.open(); + + return buildInternal(sourceStream); + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java new file mode 100644 index 000000000000..9ba6a9f67eff --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.util.Collector; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder builder( + DataStream changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for + * the maintenance stream. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder builder( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream changeStream; + private final TableLoader tableLoader; + private final List> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidPrefix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMillis(1); + private Duration concurrentCheckDelay = Duration.ofSeconds(30); + private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + this.env = env; + this.changeStream = null; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + private Builder( + DataStream changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = null; + this.changeStream = changeStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidPrefix for the transformations + * @return for chained calls + */ + public Builder uidPrefix(String newUidPrefix) { + this.uidPrefix = newUidPrefix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + * @return for chained calls + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newConcurrentCheckDelay firing frequency + * @return for chained calls + */ + public Builder concurrentCheckDelay(Duration newConcurrentCheckDelay) { + this.concurrentCheckDelay = newConcurrentCheckDelay; + return this; + } + + /** + * Sets the global parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + * @return for chained calls + */ + public Builder parallelism(int newParallelism) { + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link MonitorSource} is generated by the builder. + * + * @param newMaxReadBack snapshots to consider when initializing + * @return for chained calls + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + changeStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + * @return for chained calls + */ + public Builder add(MaintenanceTaskBuilder task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidPrefix, "Uid prefix should no be null"); + + DataStream sourceStream; + if (changeStream == null) { + // Create a monitor source to provide the TableChange stream + MonitorSource source = + new MonitorSource( + tableLoader, + RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), + maxReadBack); + sourceStream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_NAME) + .uid(uidPrefix + "-monitor-source") + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } else { + sourceStream = changeStream.global(); + } + + // Chain the TriggerManager + List taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + DataStream triggers = + // Add TriggerManager to schedule the tasks + DataStreamUtils.reinterpretAsKeyedStream(sourceStream, unused -> true) + .process( + new TriggerManager( + tableLoader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + concurrentCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_TASK_NAME) + .uid(uidPrefix + "-trigger-manager") + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + // Add a watermark after every trigger + .assignTimestampsAndWatermarks(new WindowClosingWatermarkStrategy()) + .name("Watermark Assigner") + .uid(uidPrefix + "-watermark-assigner") + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + DataStream filtered = + triggers + .flatMap(new TaskFilter(i)) + .name("Filter " + i) + .forceNonParallel() + .uid(uidPrefix + "-filter-" + i) + .slotSharingGroup(slotSharingGroup); + MaintenanceTaskBuilder builder = taskBuilders.get(i); + DataStream result = + builder.build( + filtered, + i, + taskNames.get(i), + tableLoader, + uidPrefix, + slotSharingGroup, + parallelism); + if (unioned == null) { + unioned = result; + } else { + unioned = unioned.union(result); + } + } + + // Add the LockRemover to the end + unioned + .global() + .transform( + LOCK_REMOVER_TASK_NAME, + TypeInformation.of(Void.class), + new LockRemover(lockFactory, taskNames)) + .forceNonParallel() + .uid(uidPrefix + "-lock-remover") + .slotSharingGroup(slotSharingGroup); + } + } + + private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskId) { + return String.format(TASK_NAME_FORMAT, streamBuilder.getClass().getSimpleName(), taskId); + } + + @Internal + public static class WindowClosingWatermarkStrategy implements WatermarkStrategy { + @Override + public WatermarkGenerator createWatermarkGenerator( + WatermarkGeneratorSupplier.Context context) { + return new WatermarkGenerator<>() { + @Override + public void onEvent(Trigger event, long eventTimestamp, WatermarkOutput output) { + output.emitWatermark(new Watermark(event.timestamp())); + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + // No periodic watermarks + } + }; + } + + @Override + public TimestampAssigner createTimestampAssigner( + TimestampAssignerSupplier.Context context) { + return (element, unused) -> element.timestamp(); + } + } + + @Internal + private static class TaskFilter implements FlatMapFunction { + private final int taskId; + + private TaskFilter(int taskId) { + this.taskId = taskId; + } + + @Override + public void flatMap(Trigger trigger, Collector out) { + if (trigger.taskId() != null && trigger.taskId() == taskId) { + out.collect(trigger); + } + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java index 9b6580fad0bf..e7e818ba6887 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** Sink for collecting output during testing. */ -class CollectingSink implements Sink { +public class CollectingSink implements Sink { private static final long serialVersionUID = 1L; private static final List> QUEUES = Collections.synchronizedList(Lists.newArrayListWithExpectedSize(1)); @@ -39,7 +39,7 @@ class CollectingSink implements Sink { private final int index; /** Creates a new sink which collects the elements received. */ - CollectingSink() { + public CollectingSink() { this.index = NUM_SINKS.incrementAndGet(); QUEUES.add(new LinkedBlockingQueue<>()); } @@ -69,7 +69,7 @@ boolean isEmpty() { * @return The first element received by this {@link Sink} * @throws TimeoutException if no element received until the timeout */ - T poll(Duration timeout) throws TimeoutException { + public T poll(Duration timeout) throws TimeoutException { Object element; try { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java deleted file mode 100644 index 36e162d4f068..000000000000 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ConstantsForTests.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.maintenance.operator; - -class ConstantsForTests { - public static final long EVENT_TIME = 10L; - static final long EVENT_TIME_2 = 11L; - static final String DUMMY_NAME = "dummy"; - - private ConstantsForTests() { - // Do not instantiate - } -} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java index 91d36aa3e85d..1e8ebcdeb256 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java @@ -125,6 +125,10 @@ public TableLoader tableLoader(String tableName) { return tableLoader; } + public CatalogLoader catalogLoader() { + return catalogLoader; + } + private static String toWithClause(Map props) { return String.format( "(%s)", diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java deleted file mode 100644 index 9cdc55cb0cce..000000000000 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.maintenance.operator; - -import java.io.File; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.awaitility.Awaitility; - -class FlinkStreamingTestUtils { - private FlinkStreamingTestUtils() { - // Do not instantiate - } - - /** - * Close the {@link JobClient} and wait for the job closure. If the savepointDir is specified, it - * stops the job with a savepoint. - * - * @param jobClient the job to close - * @param savepointDir the savepointDir to store the last savepoint. If null then - * stop without a savepoint. - * @return configuration for restarting the job from the savepoint - */ - static Configuration closeJobClient(JobClient jobClient, File savepointDir) { - Configuration conf = new Configuration(); - if (jobClient != null) { - if (savepointDir != null) { - // Stop with savepoint - jobClient.stopWithSavepoint(false, savepointDir.getPath(), SavepointFormatType.CANONICAL); - // Wait until the savepoint is created and the job has been stopped - Awaitility.await().until(() -> savepointDir.listFiles(File::isDirectory).length == 1); - conf.set( - SavepointConfigOptions.SAVEPOINT_PATH, - savepointDir.listFiles(File::isDirectory)[0].getAbsolutePath()); - } else { - jobClient.cancel(); - } - - // Wait until the job has been stopped - Awaitility.await().until(() -> jobClient.getJobStatus().get().isTerminalState()); - return conf; - } - - return null; - } - - /** - * Close the {@link JobClient} and wait for the job closure. - * - * @param jobClient the job to close - */ - static void closeJobClient(JobClient jobClient) { - closeJobClient(jobClient, null); - } -} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java index 679b3ec508a2..eff32fcfa118 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java @@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable; /** Testing source implementation for Flink sources which can be triggered manually. */ -class ManualSource +public class ManualSource implements Source, ResultTypeQueryable { @@ -65,7 +65,7 @@ class ManualSource * @param env to register the source * @param type of the events returned by the source */ - ManualSource(StreamExecutionEnvironment env, TypeInformation type) { + public ManualSource(StreamExecutionEnvironment env, TypeInformation type) { this.type = type; this.env = env; this.index = numSources++; @@ -78,7 +78,7 @@ class ManualSource * * @param event to emit */ - void sendRecord(T event) { + public void sendRecord(T event) { this.sendInternal(Tuple2.of(event, null)); } @@ -88,7 +88,7 @@ void sendRecord(T event) { * @param event to emit * @param eventTime of the event */ - void sendRecord(T event, long eventTime) { + public void sendRecord(T event, long eventTime) { this.sendInternal(Tuple2.of(event, eventTime)); } @@ -97,7 +97,7 @@ void sendRecord(T event, long eventTime) { * * @param timeStamp of the watermark */ - void sendWatermark(long timeStamp) { + public void sendWatermark(long timeStamp) { this.sendInternal(Tuple2.of(null, timeStamp)); } @@ -112,7 +112,7 @@ void markFinished() { * * @return the stream emitted by this source */ - DataStream dataStream() { + public DataStream dataStream() { if (this.stream == null) { this.stream = this.env diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 225853086545..63ba6f587462 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -19,27 +19,41 @@ package org.apache.iceberg.flink.maintenance.operator; import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; +import java.io.IOException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.iceberg.flink.FlinkCatalogFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; -class OperatorTestBase { +public class OperatorTestBase { private static final int NUMBER_TASK_MANAGERS = 1; private static final int SLOTS_PER_TASK_MANAGER = 8; - private static final TriggerLockFactory.Lock MAINTENANCE_LOCK = new MemoryLock(); - private static final TriggerLockFactory.Lock RECOVERY_LOCK = new MemoryLock(); - static final String TABLE_NAME = "test_table"; + protected static final String UID_PREFIX = "UID-Dummy"; + protected static final String SLOT_SHARING_GROUP = "SlotSharingGroup"; + protected static final String TABLE_NAME = "test_table"; + protected static final TriggerLockFactory LOCK_FACTORY = new MemoryLockFactory(); + + public static final String IGNORED_OPERATOR_NAME = "Ignore"; + + static final long EVENT_TIME = 10L; + static final long EVENT_TIME_2 = 11L; + protected static final String DUMMY_NAME = "dummy"; @RegisterExtension protected static final MiniClusterExtension MINI_CLUSTER_EXTENSION = @@ -51,42 +65,21 @@ class OperatorTestBase { .build()); @RegisterExtension - final FlinkSqlExtension sql = + public final FlinkSqlExtension sql = new FlinkSqlExtension( "catalog", ImmutableMap.of("type", "iceberg", FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "hadoop"), "db"); - private static Configuration config() { - Configuration config = new Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG); - MetricOptions.forReporter(config, "test_reporter") - .set(MetricOptions.REPORTER_FACTORY_CLASS, MetricsReporterFactoryForTests.class.getName()); - return config; + @BeforeEach + void before() { + LOCK_FACTORY.open(); + MetricsReporterFactoryForTests.reset(); } - protected static TriggerLockFactory lockFactory() { - return new TriggerLockFactory() { - @Override - public void open() { - MAINTENANCE_LOCK.unlock(); - RECOVERY_LOCK.unlock(); - } - - @Override - public Lock createLock() { - return MAINTENANCE_LOCK; - } - - @Override - public Lock createRecoveryLock() { - return RECOVERY_LOCK; - } - - @Override - public void close() { - // do nothing - } - }; + @AfterEach + void after() throws IOException { + LOCK_FACTORY.close(); } /** @@ -98,7 +91,7 @@ public void close() { * stop without a savepoint. * @return configuration for restarting the job from the savepoint */ - public static Configuration closeJobClient(JobClient jobClient, File savepointDir) { + protected static Configuration closeJobClient(JobClient jobClient, File savepointDir) { Configuration conf = new Configuration(); if (jobClient != null) { if (savepointDir != null) { @@ -126,12 +119,45 @@ public static Configuration closeJobClient(JobClient jobClient, File savepointDi * * @param jobClient the job to close */ - public static void closeJobClient(JobClient jobClient) { + protected static void closeJobClient(JobClient jobClient) { closeJobClient(jobClient, null); } + protected static void checkUidsAreSet(StreamExecutionEnvironment env, String uidPrefix) { + env.getTransformations().stream() + .filter( + t -> !(t instanceof SinkTransformation) && !(t.getName().equals(IGNORED_OPERATOR_NAME))) + .forEach( + transformation -> { + assertThat(transformation.getUid()).isNotNull(); + if (uidPrefix != null) { + assertThat(transformation.getUid()).contains(UID_PREFIX); + } + }); + } + + protected static void checkSlotSharingGroupsAreSet(StreamExecutionEnvironment env, String name) { + String nameToCheck = name != null ? name : StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + + env.getTransformations().stream() + .filter( + t -> !(t instanceof SinkTransformation) && !(t.getName().equals(IGNORED_OPERATOR_NAME))) + .forEach( + t -> { + assertThat(t.getSlotSharingGroup()).isPresent(); + assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(nameToCheck); + }); + } + + private static Configuration config() { + Configuration config = new Configuration(DISABLE_CLASSLOADER_CHECK_CONFIG); + MetricOptions.forReporter(config, "test_reporter") + .set(MetricOptions.REPORTER_FACTORY_CLASS, MetricsReporterFactoryForTests.class.getName()); + return config; + } + private static class MemoryLock implements TriggerLockFactory.Lock { - boolean locked = false; + volatile boolean locked = false; @Override public boolean tryLock() { @@ -153,4 +179,30 @@ public void unlock() { locked = false; } } + + private static class MemoryLockFactory implements TriggerLockFactory { + private static final TriggerLockFactory.Lock MAINTENANCE_LOCK = new MemoryLock(); + private static final TriggerLockFactory.Lock RECOVERY_LOCK = new MemoryLock(); + + @Override + public void open() { + MAINTENANCE_LOCK.unlock(); + RECOVERY_LOCK.unlock(); + } + + @Override + public Lock createLock() { + return MAINTENANCE_LOCK; + } + + @Override + public Lock createRecoveryLock() { + return RECOVERY_LOCK; + } + + @Override + public void close() { + // do nothing + } + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java new file mode 100644 index 000000000000..3e443fdbbc31 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +class TestAsyncDeleteFiles extends OperatorTestBase { + private static final String DUMMY_FILE_NAME = "dummy"; + + @Test + void testDelete() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + Table table = tableLoader.loadTable(); + + // Write an extra files + Path dummyFile = + FileSystems.getDefault().getPath(table.location().substring(5), DUMMY_FILE_NAME); + Files.write(dummyFile, "DUMMY".getBytes(StandardCharsets.UTF_8)); + + List actual = deleteDummyFileAndWait(tableLoader); + + assertThat(actual).isEqualTo(ImmutableList.of(Boolean.TRUE)); + assertThat(Files.exists(dummyFile)).isFalse(); + } + + @Test + void testDeleteMissingFile() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + + List actual = deleteDummyFileAndWait(sql.tableLoader(TABLE_NAME)); + + assertThat(actual).isEqualTo(ImmutableList.of(Boolean.TRUE)); + } + + @Test + void testWrongFile() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + + StreamTaskMailboxTestHarnessBuilder builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.BOOLEAN_TYPE_INFO) + .addInput(BasicTypeInfo.STRING_TYPE_INFO); + CountingPredicateChecker predicate = new CountingPredicateChecker(); + + try (StreamTaskMailboxTestHarness testHarness = + builder + .setupOutputForSingletonOperatorChain( + asyncWaitOperatorFactory(sql.tableLoader(TABLE_NAME), predicate)) + .build()) { + testHarness.processElement(new StreamRecord<>("wrong://", System.currentTimeMillis())); + + while (testHarness.getOutput().isEmpty()) { + Awaitility.await().until(() -> testHarness.getOutput().isEmpty()); + testHarness.processAll(); + } + + // Make sure that we do not complete immediately + assertThat(CountingPredicateChecker.calls).isEqualTo(3); + + // The result still should be fail + assertThat( + testHarness.getOutput().stream() + .map(r -> ((StreamRecord) r).getValue()) + .collect(Collectors.toList())) + .isEqualTo(ImmutableList.of(Boolean.FALSE)); + } + } + + private List deleteDummyFileAndWait(TableLoader tableLoader) throws Exception { + Table table = tableLoader.loadTable(); + + try (OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>( + asyncWaitOperatorFactory(tableLoader, new CountingPredicateChecker()), + StringSerializer.INSTANCE)) { + testHarness.open(); + testHarness.processElement( + table.location() + "/" + DUMMY_FILE_NAME, System.currentTimeMillis()); + + // wait until all async collectors in the buffer have been emitted out. + testHarness.endInput(); + testHarness.close(); + + return testHarness.extractOutputValues(); + } + } + + private AsyncWaitOperatorFactory asyncWaitOperatorFactory( + TableLoader tableLoader, Predicate> predicate) { + return new AsyncWaitOperatorFactory<>( + new AsyncDeleteFiles(DUMMY_NAME, tableLoader, 10), + 1000000, + 10, + AsyncDataStream.OutputMode.ORDERED, + new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder( + 2, 10, 1000, 1.5) + .ifResult(predicate) + .build()); + } + + private static class CountingPredicateChecker + implements Predicate>, Serializable { + private static int calls = 0; + + @Override + public boolean test(Collection param) { + ++calls; + return AsyncDeleteFiles.FAILED_PREDICATE.test(param); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java new file mode 100644 index 000000000000..79af082a0d44 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Queue; +import java.util.Set; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class TestExpireSnapshotsProcessor extends OperatorTestBase { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExpire(boolean success) throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar, spec varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a', 'p1')", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (2, 'b', 'p2')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + Table table = tableLoader.loadTable(); + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + + List actual; + Queue> deletes; + try (OneInputStreamOperatorTestHarness testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new ExpireSnapshotsProcessor(tableLoader, 0L, 1, 10))) { + testHarness.open(); + + if (!success) { + // Cause an exception + sql.exec("DROP TABLE IF EXISTS %s", TABLE_NAME); + } + + testHarness.processElement( + Trigger.create(10, serializableTable, 11), System.currentTimeMillis()); + deletes = testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM); + actual = testHarness.extractOutputValues(); + } + + assertThat(actual).hasSize(1); + TaskResult result = actual.get(0); + assertThat(result.startEpoch()).isEqualTo(10); + assertThat(result.taskIndex()).isEqualTo(11); + assertThat(result.success()).isEqualTo(success); + + if (success) { + assertThat(result.exceptions()).isNotNull().isEmpty(); + + table.refresh(); + Set snapshots = Sets.newHashSet(table.snapshots()); + assertThat(snapshots).hasSize(1); + assertThat(deletes).hasSize(1); + } else { + assertThat(result.exceptions()).isNotNull().hasSize(1); + + assertThat(deletes).isNull(); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java index ccb90ec33d9c..f6f6f932e9d0 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemover.java @@ -18,8 +18,8 @@ */ package org.apache.iceberg.flink.maintenance.operator; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.LAST_RUN_DURATION_MS; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER; import static org.assertj.core.api.Assertions.assertThat; @@ -54,7 +54,7 @@ @Timeout(value = 10) class TestLockRemover extends OperatorTestBase { - private static final String[] TASKS = new String[] {"task0", "task1"}; + private static final String[] TASKS = new String[] {"task0", "task1", "task2"}; private static final TriggerLockFactory.Lock LOCK = new TestingLock(); private static final TriggerLockFactory.Lock RECOVERY_LOCK = new TestingLock(); @@ -135,15 +135,16 @@ void testMetrics() throws Exception { .setParallelism(1); JobClient jobClient = null; + long time = System.currentTimeMillis(); try { jobClient = env.executeAsync(); // Start the 2 successful and one failed result trigger for task1, and 3 successful for task2 - processAndCheck(source, new TaskResult(0, 0L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(1, 1L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(1, 2L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(0, 3L, false, Lists.newArrayList())); - processAndCheck(source, new TaskResult(0, 4L, true, Lists.newArrayList())); - processAndCheck(source, new TaskResult(1, 5L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, false, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); Awaitility.await() .until( @@ -159,7 +160,22 @@ void testMetrics() throws Exception { .put(DUMMY_NAME + "." + TASKS[0] + "." + FAILED_TASK_COUNTER, 1L) .put(DUMMY_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER, 3L) .put(DUMMY_NAME + "." + TASKS[1] + "." + FAILED_TASK_COUNTER, 0L) + .put(DUMMY_NAME + "." + TASKS[2] + "." + SUCCEEDED_TASK_COUNTER, 0L) + .put(DUMMY_NAME + "." + TASKS[2] + "." + FAILED_TASK_COUNTER, 0L) .build()); + + assertThat( + MetricsReporterFactoryForTests.gauge( + DUMMY_NAME + "." + TASKS[0] + "." + LAST_RUN_DURATION_MS)) + .isPositive(); + assertThat( + MetricsReporterFactoryForTests.gauge( + DUMMY_NAME + "." + TASKS[1] + "." + LAST_RUN_DURATION_MS)) + .isGreaterThan(time); + assertThat( + MetricsReporterFactoryForTests.gauge( + DUMMY_NAME + "." + TASKS[2] + "." + LAST_RUN_DURATION_MS)) + .isZero(); } finally { closeJobClient(jobClient); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java index fba4a12d9c6b..c761c2904186 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.flink.maintenance.operator; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME; -import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; @@ -28,7 +25,6 @@ import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; import java.time.Duration; import java.util.Iterator; import java.util.List; @@ -47,7 +43,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -56,28 +51,16 @@ class TestTriggerManager extends OperatorTestBase { private static final long DELAY = 10L; - private static final String NAME_1 = "name1"; - private static final String NAME_2 = "name2"; + private static final String[] TASKS = new String[] {"task0", "task1"}; private long processingTime = 0L; - private TriggerLockFactory lockFactory; private TriggerLockFactory.Lock lock; private TriggerLockFactory.Lock recoveringLock; @BeforeEach void before() { sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); - this.lockFactory = lockFactory(); - lockFactory.open(); - this.lock = lockFactory.createLock(); - this.recoveringLock = lockFactory.createRecoveryLock(); - lock.unlock(); - recoveringLock.unlock(); - MetricsReporterFactoryForTests.reset(); - } - - @AfterEach - void after() throws IOException { - lockFactory.close(); + this.lock = LOCK_FACTORY.createLock(); + this.recoveringLock = LOCK_FACTORY.createRecoveryLock(); } @Test @@ -444,8 +427,8 @@ void testTriggerMetrics() throws Exception { TriggerManager manager = new TriggerManager( tableLoader, - lockFactory, - Lists.newArrayList(NAME_1, NAME_2), + LOCK_FACTORY, + Lists.newArrayList(TASKS), Lists.newArrayList( new TriggerEvaluator.Builder().commitCount(2).build(), new TriggerEvaluator.Builder().commitCount(4).build()), @@ -480,7 +463,7 @@ void testTriggerMetrics() throws Exception { // Wait until we receive the trigger assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED)) .isEqualTo(1L); lock.unlock(); @@ -492,10 +475,10 @@ void testTriggerMetrics() throws Exception { assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); lock.unlock(); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED)) .isEqualTo(2L); assertThat( - MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED)) + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + TASKS[1] + "." + TRIGGERED)) .isEqualTo(1L); // Final check all the counters @@ -503,8 +486,8 @@ void testTriggerMetrics() throws Exception { new ImmutableMap.Builder() .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, -1L) .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED, -1L) - .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 2L) - .put(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED, 1L) + .put(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED, 2L) + .put(DUMMY_NAME + "." + TASKS[1] + "." + TRIGGERED, 1L) .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 1L) .build()); } finally { @@ -618,7 +601,7 @@ private void assertCounters(long rateLimiterTrigger, long concurrentRunTrigger) .put( DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_THROTTLED, concurrentRunTrigger) - .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 1L) + .put(DUMMY_NAME + "." + TASKS[0] + "." + TRIGGERED, 1L) .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 0L) .build()); } @@ -644,15 +627,20 @@ private void addEventAndCheckResult( private TriggerManager manager(TableLoader tableLoader, TriggerEvaluator evaluator) { return new TriggerManager( - tableLoader, lockFactory, Lists.newArrayList(NAME_1), Lists.newArrayList(evaluator), 1, 1); + tableLoader, + LOCK_FACTORY, + Lists.newArrayList(TASKS[0]), + Lists.newArrayList(evaluator), + 1, + 1); } private TriggerManager manager( TableLoader tableLoader, long minFireDelayMs, long lockCheckDelayMs) { return new TriggerManager( tableLoader, - lockFactory, - Lists.newArrayList(NAME_1), + LOCK_FACTORY, + Lists.newArrayList(TASKS[0]), Lists.newArrayList(new TriggerEvaluator.Builder().commitCount(2).build()), minFireDelayMs, lockCheckDelayMs); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledBuilderTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledBuilderTestBase.java new file mode 100644 index 000000000000..a2474c0f0809 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledBuilderTestBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.function.Supplier; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.operator.CollectingSink; +import org.apache.iceberg.flink.maintenance.operator.ManualSource; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ScheduledBuilderTestBase extends OperatorTestBase { + private static final int TESTING_TASK_ID = 0; + private static final Duration POLL_DURATION = Duration.ofSeconds(5); + static final String DB_NAME = "db"; + + @RegisterExtension ScheduledInfraExtension infra = new ScheduledInfraExtension(); + + /** + * Triggers a maintenance tasks and waits for the successful result. The {@link Table} is + * refreshed for convenience reasons. + * + * @param env used for testing + * @param triggerSource used for manually emitting the trigger + * @param collectingSink used for collecting the result + * @param table used for generating the payload + * @throws Exception if any + */ + void runAndWaitForSuccess( + StreamExecutionEnvironment env, + ManualSource triggerSource, + CollectingSink collectingSink, + Supplier checkSideEffect, + Table table) + throws Exception { + table.refresh(); + SerializableTable payload = (SerializableTable) SerializableTable.copyOf(table); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Do a single task run + long time = System.currentTimeMillis(); + triggerSource.sendRecord(Trigger.create(time, payload, TESTING_TASK_ID), time); + + TaskResult result = collectingSink.poll(POLL_DURATION); + + assertThat(result.startEpoch()).isEqualTo(time); + assertThat(result.success()).isTrue(); + assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID); + + Awaitility.await().until(() -> checkSideEffect.get()); + } finally { + closeJobClient(jobClient); + } + + table.refresh(); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java new file mode 100644 index 000000000000..210c003f183a --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import static org.apache.iceberg.flink.maintenance.operator.OperatorTestBase.IGNORED_OPERATOR_NAME; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.flink.maintenance.operator.CollectingSink; +import org.apache.iceberg.flink.maintenance.operator.ManualSource; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +/** + * {@link org.junit.jupiter.api.extension.Extension} used to generate the common elements for the + * {@link MaintenanceTaskBuilder} implementations. These are the following: + * + *

    + *
  • {@link StreamExecutionEnvironment} - environment for testing + *
  • {@link ManualSource} - source for manually emitting {@link Trigger}s + *
  • {@link DataStream} - which generated from the {@link ManualSource} + *
  • {@link CollectingSink} - which could be used poll for the records emitted by the + * maintenance tasks + *
+ */ +class ScheduledInfraExtension implements BeforeEachCallback { + private StreamExecutionEnvironment env; + private ManualSource source; + private DataStream triggerStream; + private CollectingSink sink; + + @Override + public void beforeEach(ExtensionContext context) { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + source = new ManualSource<>(env, TypeInformation.of(Trigger.class)); + // Adds the watermark to mimic the behaviour expected for the input of the maintenance tasks + triggerStream = + source + .dataStream() + .assignTimestampsAndWatermarks(new TableMaintenance.WindowClosingWatermarkStrategy()) + .name(IGNORED_OPERATOR_NAME) + .forceNonParallel(); + sink = new CollectingSink<>(); + } + + StreamExecutionEnvironment env() { + return env; + } + + ManualSource source() { + return source; + } + + DataStream triggerStream() { + return triggerStream; + } + + CollectingSink sink() { + return sink; + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java new file mode 100644 index 000000000000..8d3c8864ebe3 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import static org.apache.iceberg.flink.SimpleDataUtil.createRecord; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER; +import static org.apache.iceberg.flink.maintenance.stream.ExpireSnapshots.DELETE_FILES_TASK_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.Set; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestExpireSnapshots extends ScheduledBuilderTestBase { + @BeforeEach + void before() { + MetricsReporterFactoryForTests.reset(); + } + + @Test + void testExpireSnapshots() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (3, 'c')", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (4, 'd')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + Table table = tableLoader.loadTable(); + Set snapshots = Sets.newHashSet(table.snapshots()); + assertThat(snapshots).hasSize(4); + + ExpireSnapshots.builder() + .parallelism(1) + .planningWorkerPoolSize(2) + .deleteAttemptNum(2) + .deleteWorkerPoolSize(5) + .minAge(Duration.ZERO) + .retainLast(1) + .uidPrefix(UID_PREFIX) + .build( + infra.triggerStream(), + 0, + DUMMY_NAME, + tableLoader, + "OTHER", + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + runAndWaitForSuccess( + infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(3L), table); + + // Check that the table data not changed + table.refresh(); + assertThat(Sets.newHashSet(table.snapshots())).hasSize(1); + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "a"), + createRecord(2, "b"), + createRecord(3, "c"), + createRecord(4, "d"))); + } + + @Test + void testFailure() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + Table table = tableLoader.loadTable(); + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + + ExpireSnapshots.builder() + .build( + infra.triggerStream(), + 0, + DUMMY_NAME, + tableLoader, + UID_PREFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + JobClient jobClient = null; + try { + jobClient = infra.env().executeAsync(); + + // Do a single task run + long time = System.currentTimeMillis(); + infra + .source() + .sendRecord(Trigger.create(time, serializableTable, 1), System.currentTimeMillis()); + + // First successful run (ensure that the operators are loaded/opened etc.) + assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue(); + + // Drop the table, so it will cause an exception + sql.catalogLoader().loadCatalog().dropTable(TableIdentifier.of(DB_NAME, TABLE_NAME)); + + // Failed run + infra.source().sendRecord(Trigger.create(time + 1, serializableTable, 1), time + 1); + + assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isFalse(); + } finally { + closeJobClient(jobClient); + } + + // Check the metrics + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder() + .put(DELETE_FILES_TASK_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER, 0L) + .put( + DELETE_FILES_TASK_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER, 0L) + .build()); + } + + @Test + void testUidAndSlotSharingGroup() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + ExpireSnapshots.builder() + .slotSharingGroup(SLOT_SHARING_GROUP) + .uidPrefix(UID_PREFIX) + .build( + infra.triggerStream(), + 0, + DUMMY_NAME, + tableLoader, + UID_PREFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + checkUidsAreSet(infra.env(), UID_PREFIX); + checkSlotSharingGroupsAreSet(infra.env(), SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupUnset() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + ExpireSnapshots.builder() + .build( + infra.triggerStream(), + 0, + DUMMY_NAME, + tableLoader, + UID_PREFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + checkUidsAreSet(infra.env(), null); + checkSlotSharingGroupsAreSet(infra.env(), null); + } + + @Test + void testMetrics() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (2, 'b')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + Table table = tableLoader.loadTable(); + + ExpireSnapshots.builder() + .minAge(Duration.ZERO) + .retainLast(1) + .parallelism(1) + .build( + infra.triggerStream(), + 0, + DUMMY_NAME, + tableLoader, + UID_PREFIX, + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, + 1) + .sinkTo(infra.sink()); + + runAndWaitForSuccess( + infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(1L), table); + + // Check the metrics + Awaitility.await() + .untilAsserted( + () -> + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder() + .put( + DELETE_FILES_TASK_NAME + + "." + + DUMMY_NAME + + "." + + DELETE_FILE_FAILED_COUNTER, + 0L) + .put( + DELETE_FILES_TASK_NAME + + "." + + DUMMY_NAME + + "." + + DELETE_FILE_SUCCEEDED_COUNTER, + 1L) + .build())); + } + + private static boolean checkDeleteFinished(Long expectedDeleteNum) { + return expectedDeleteNum.equals( + MetricsReporterFactoryForTests.counter( + DELETE_FILES_TASK_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER)); + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java new file mode 100644 index 000000000000..fd9b136af21d --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestMaintenanceE2E extends OperatorTestBase { + private StreamExecutionEnvironment env; + + @BeforeEach + public void beforeEach() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + void testE2e() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + TableMaintenance.builder(env, tableLoader, LOCK_FACTORY) + .uidPrefix("E2eTestUID") + .rateLimit(Duration.ofMinutes(10)) + .concurrentCheckDelay(Duration.ofSeconds(10)) + .add( + ExpireSnapshots.builder() + .scheduleOnCommitCount(10) + .minAge(Duration.ofMinutes(10)) + .retainLast(5) + .deleteWorkerPoolSize(5) + .parallelism(8)) + .append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Just make sure that we are able to instantiate the flow + assertThat(jobClient).isNotNull(); + } finally { + closeJobClient(jobClient); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java new file mode 100644 index 000000000000..c7fa51675886 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import static org.apache.iceberg.flink.SimpleDataUtil.createRowData; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; +import static org.apache.iceberg.flink.maintenance.stream.TableMaintenance.LOCK_REMOVER_TASK_NAME; +import static org.apache.iceberg.flink.maintenance.stream.TableMaintenance.SOURCE_NAME; +import static org.apache.iceberg.flink.maintenance.stream.TableMaintenance.TRIGGER_MANAGER_TASK_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.Serializable; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.ManualSource; +import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestTableMaintenance extends OperatorTestBase { + private static final String[] TASKS = + new String[] { + MaintenanceTaskBuilderForTest.class.getSimpleName() + " [0]", + MaintenanceTaskBuilderForTest.class.getSimpleName() + " [1]" + }; + private static final TableChange DUMMY_CHANGE = TableChange.builder().commitCount(1).build(); + private static final List PROCESSED = + Collections.synchronizedList(Lists.newArrayListWithCapacity(1)); + + private StreamExecutionEnvironment env; + + @TempDir private File checkpointDir; + + @BeforeEach + public void beforeEach() { + Configuration config = new Configuration(); + config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + checkpointDir.getPath()); + env = StreamExecutionEnvironment.getExecutionEnvironment(config); + PROCESSED.clear(); + MaintenanceTaskBuilderForTest.counter = 0; + } + + @Test + void testFromStream() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.builder(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .concurrentCheckDelay(Duration.ofSeconds(3)) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .scheduleOnDataFileCount(2) + .scheduleOnDataFileSize(3L) + .scheduleOnEqDeleteFileCount(4) + .scheduleOnEqDeleteRecordCount(5L) + .schedulerOnPosDeleteFileCount(6) + .schedulerOnPosDeleteRecordCount(7L) + .scheduleOnTime(Duration.ofHours(1))); + + sendEvents(schedulerSource, streamBuilder, ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1))); + } + + @Test + void testFromEnv() throws Exception { + sql.exec( + "CREATE TABLE %s (id int, data varchar)" + + "WITH ('flink.max-continuous-empty-commits'='100000')", + TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + Table table = tableLoader.loadTable(); + + env.enableCheckpointing(10); + + TableMaintenance.builder(env, tableLoader, LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .maxReadBack(2) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(2)) + .append(); + + // Creating a stream for inserting data into the table concurrently + ManualSource insertSource = + new ManualSource<>(env, InternalTypeInfo.of(FlinkSchemaUtil.convert(table.schema()))); + FlinkSink.forRowData(insertSource.dataStream()) + .tableLoader(tableLoader) + .uidPrefix(UID_PREFIX + "-iceberg-sink") + .append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + insertSource.sendRecord(createRowData(2, "b")); + + Awaitility.await().until(() -> PROCESSED.size() == 1); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testLocking() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + TriggerLockFactory.Lock lock = LOCK_FACTORY.createLock(); + + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.builder(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)); + + assertThat(lock.isHeld()).isFalse(); + sendEvents(schedulerSource, streamBuilder, ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1))); + + assertThat(lock.isHeld()).isFalse(); + } + + @Test + void testMetrics() throws Exception { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.builder(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) + .rateLimit(Duration.ofMillis(2)) + .concurrentCheckDelay(Duration.ofMillis(2)) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .add(new MaintenanceTaskBuilderForTest(false).scheduleOnCommitCount(2)); + + sendEvents( + schedulerSource, + streamBuilder, + ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1), Tuple2.of(DUMMY_CHANGE, 2))); + + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + LOCK_REMOVER_TASK_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER) + .equals(2L)); + + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder() + .put(LOCK_REMOVER_TASK_NAME + "." + TASKS[0] + "." + SUCCEEDED_TASK_COUNTER, 2L) + .put(LOCK_REMOVER_TASK_NAME + "." + TASKS[0] + "." + FAILED_TASK_COUNTER, 0L) + .put(TRIGGER_MANAGER_TASK_NAME + "." + TASKS[0] + "." + TRIGGERED, 2L) + .put(LOCK_REMOVER_TASK_NAME + "." + TASKS[1] + "." + SUCCEEDED_TASK_COUNTER, 0L) + .put(LOCK_REMOVER_TASK_NAME + "." + TASKS[1] + "." + FAILED_TASK_COUNTER, 1L) + .put(TRIGGER_MANAGER_TASK_NAME + "." + TASKS[1] + "." + TRIGGERED, 1L) + .put( + TRIGGER_MANAGER_TASK_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, + -1L) + .put( + TRIGGER_MANAGER_TASK_NAME + + "." + + GROUP_VALUE_DEFAULT + + "." + + CONCURRENT_RUN_THROTTLED, + -1L) + .put( + TRIGGER_MANAGER_TASK_NAME + + "." + + GROUP_VALUE_DEFAULT + + "." + + RATE_LIMITER_TRIGGERED, + -1L) + .build()); + } + + @Test + void testUidAndSlotSharingGroup() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + TableMaintenance.builder( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader, + LOCK_FACTORY) + .uidPrefix(UID_PREFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidPrefix(UID_PREFIX) + .slotSharingGroup(SLOT_SHARING_GROUP)) + .append(); + + checkUidsAreSet(env, UID_PREFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupUnset() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + TableMaintenance.builder( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader, + LOCK_FACTORY) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .append(); + + checkUidsAreSet(env, null); + checkSlotSharingGroupsAreSet(env, null); + } + + @Test + void testUidAndSlotSharingGroupInherit() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + TableMaintenance.builder( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader, + LOCK_FACTORY) + .uidPrefix(UID_PREFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .append(); + + checkUidsAreSet(env, UID_PREFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupOverWrite() { + String anotherUid = "Another-UID"; + String anotherSlotSharingGroup = "Another-SlotSharingGroup"; + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + TableMaintenance.builder( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader, + LOCK_FACTORY) + .uidPrefix(UID_PREFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidPrefix(anotherUid) + .slotSharingGroup(anotherSlotSharingGroup)) + .append(); + + // Something from the scheduler + Transformation schedulerTransformation = + env.getTransformations().stream() + .filter(t -> t.getName().equals("Trigger manager")) + .findFirst() + .orElseThrow(); + assertThat(schedulerTransformation.getUid()).contains(UID_PREFIX); + assertThat(schedulerTransformation.getSlotSharingGroup()).isPresent(); + assertThat(schedulerTransformation.getSlotSharingGroup().get().getName()) + .isEqualTo(SLOT_SHARING_GROUP); + + // Something from the scheduled stream + Transformation scheduledTransformation = + env.getTransformations().stream() + .filter( + t -> t.getName().startsWith(MaintenanceTaskBuilderForTest.class.getSimpleName())) + .findFirst() + .orElseThrow(); + assertThat(scheduledTransformation.getUid()).contains(anotherUid); + assertThat(scheduledTransformation.getSlotSharingGroup()).isPresent(); + assertThat(scheduledTransformation.getSlotSharingGroup().get().getName()) + .isEqualTo(anotherSlotSharingGroup); + } + + @Test + void testUidAndSlotSharingGroupForMonitor() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + sql.exec("INSERT INTO %s VALUES (1, 'a')", TABLE_NAME); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + tableLoader.open(); + + TableMaintenance.builder(env, tableLoader, LOCK_FACTORY) + .uidPrefix(UID_PREFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidPrefix(UID_PREFIX) + .slotSharingGroup(SLOT_SHARING_GROUP)) + .append(); + + Transformation source = monitorSource(); + assertThat(source).isNotNull(); + assertThat(source.getUid()).contains(UID_PREFIX); + assertThat(source.getSlotSharingGroup()).isPresent(); + assertThat(source.getSlotSharingGroup().get().getName()).isEqualTo(SLOT_SHARING_GROUP); + + checkUidsAreSet(env, UID_PREFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + /** + * Sends the events though the {@link ManualSource} provided, and waits until the given number of + * records are processed. + * + * @param schedulerSource used for sending the events + * @param streamBuilder used for generating the job + * @param eventsAndResultNumbers the pair of the event and the expected processed records + * @throws Exception if any + */ + private void sendEvents( + ManualSource schedulerSource, + TableMaintenance.Builder streamBuilder, + List> eventsAndResultNumbers) + throws Exception { + streamBuilder.append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + eventsAndResultNumbers.forEach( + eventsAndResultNumber -> { + int expectedSize = PROCESSED.size() + eventsAndResultNumber.f1; + schedulerSource.sendRecord(eventsAndResultNumber.f0); + Awaitility.await() + .until( + () -> PROCESSED.size() == expectedSize && !LOCK_FACTORY.createLock().isHeld()); + }); + } finally { + closeJobClient(jobClient); + } + } + + private static class MaintenanceTaskBuilderForTest + extends MaintenanceTaskBuilder { + private final boolean success; + private final int id; + private static int counter = 0; + + MaintenanceTaskBuilderForTest(boolean success) { + this.success = success; + this.id = counter; + ++counter; + } + + @Override + DataStream buildInternal(DataStream trigger) { + String name = TASKS[id]; + return trigger + .map(new DummyMaintenanceTask(success)) + .name(name) + .uid(uidPrefix() + "-test-mapper-" + name) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + } + } + + /** + * Finds the {@link org.apache.iceberg.flink.maintenance.operator.MonitorSource} for testing + * purposes by parsing the transformation tree. + * + * @return The monitor source if we found it + */ + private Transformation monitorSource() { + assertThat(env.getTransformations()).isNotEmpty(); + assertThat(env.getTransformations().get(0).getInputs()).isNotEmpty(); + assertThat(env.getTransformations().get(0).getInputs().get(0).getInputs()).isNotEmpty(); + + Transformation result = + env.getTransformations().get(0).getInputs().get(0).getInputs().get(0); + + // Some checks to make sure this is the transformation we are looking for + assertThat(result).isInstanceOf(SourceTransformation.class); + assertThat(result.getName()).isEqualTo(SOURCE_NAME); + + return result; + } + + private static class DummyMaintenanceTask + implements MapFunction, ResultTypeQueryable, Serializable { + private final boolean success; + + private DummyMaintenanceTask(boolean success) { + this.success = success; + } + + @Override + public TaskResult map(Trigger trigger) { + // Ensure that the lock is held when processing + assertThat(LOCK_FACTORY.createLock().isHeld()).isTrue(); + PROCESSED.add(trigger); + + return new TaskResult( + trigger.taskId(), + trigger.timestamp(), + success, + success ? Collections.emptyList() : Lists.newArrayList(new Exception("Testing error"))); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(TaskResult.class); + } + } +}