Skip to content

Commit

Permalink
Flink: Maintenance - TableManager + ExpireSnapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 16, 2024
1 parent 8b4b2c1 commit 70e0783
Show file tree
Hide file tree
Showing 27 changed files with 2,338 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Delete the files using the {@link FileIO}. */
@Internal
public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncDeleteFiles.class);
public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new FailedPredicate();

private final String name;
private final FileIO io;
private final int workerPoolSize;
private final String tableName;

private transient ExecutorService workerPool;
private transient Counter failedCounter;
private transient Counter succeededCounter;

public AsyncDeleteFiles(String name, TableLoader tableLoader, int workerPoolSize) {
Preconditions.checkNotNull(name, "Name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");

this.name = name;
tableLoader.open();
Table table = tableLoader.loadTable();
this.io = table.io();
this.workerPoolSize = workerPoolSize;
this.tableName = table.name();
}

@Override
public void open(Configuration parameters) throws Exception {
this.failedCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
this.succeededCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);

this.workerPool =
ThreadPools.newWorkerPool(tableName + "-" + name + "-async-delete-files", workerPoolSize);
}

@Override
public void asyncInvoke(String fileName, ResultFuture<Boolean> resultFuture) {
workerPool.execute(
() -> {
try {
LOG.info("Deleting file: {} with {}", fileName, name);
io.deleteFile(fileName);
resultFuture.complete(Collections.singletonList(true));
succeededCounter.inc();
} catch (Throwable e) {
LOG.info("Failed to delete file {} with {}", fileName, name, e);
resultFuture.complete(Collections.singletonList(false));
failedCounter.inc();
}
});
}

private static class FailedPredicate implements Predicate<Collection<Boolean>>, Serializable {
@Override
public boolean test(Collection<Boolean> collection) {
return collection.size() != 1 || !collection.iterator().next();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could
* be removed in the {@link #DELETE_STREAM} side output.
*/
public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> {
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
public static final OutputTag<String> DELETE_STREAM =
new OutputTag<>("delete-stream", Types.STRING);

private final TableLoader tableLoader;
private final Long minAgeMs;
private final Integer retainLast;
private final int plannerPoolSize;
private transient ExecutorService plannerPool;
private transient Table table;

public ExpireSnapshotsProcessor(
TableLoader tableLoader, Long minAgeMs, Integer retainLast, int plannerPoolSize) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");

this.tableLoader = tableLoader;
this.minAgeMs = minAgeMs;
this.retainLast = retainLast;
this.plannerPoolSize = plannerPoolSize;
}

@Override
public void open(Configuration parameters) throws Exception {
tableLoader.open();
this.table = tableLoader.loadTable();
this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize);
}

@Override
public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> out)
throws Exception {
try {
table.refresh();
ExpireSnapshots expireSnapshots = table.expireSnapshots();
if (minAgeMs != null) {
expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - minAgeMs);
}

if (retainLast != null) {
expireSnapshots = expireSnapshots.retainLast(retainLast);
}

expireSnapshots
.planWith(plannerPool)
.deleteWith(file -> ctx.output(DELETE_STREAM, file))
.cleanExpiredFiles(true)
.commit();

LOG.info("Successfully finished expiring snapshots for {} at {}", table, ctx.timestamp());
out.collect(
new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList()));
} catch (Exception e) {
LOG.info("Exception expiring snapshots for {} at {}", table, ctx.timestamp(), e);
out.collect(
new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

/** Monitors an Iceberg table for changes */
@Internal
class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);

private final TableLoader tableLoader;
Expand All @@ -58,7 +58,7 @@ class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
* @param rateLimiterStrategy limits the frequency the table is checked
* @param maxReadBack sets the number of snapshots read before stopping change collection
*/
MonitorSource(
public MonitorSource(
TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/** Event describing changes in an Iceberg table */
@Internal
class TableChange {
public class TableChange {
private int dataFileCount;
private long dataFileSizeInBytes;
private int posDeleteFileCount;
Expand Down Expand Up @@ -87,7 +87,7 @@ static TableChange empty() {
return new TableChange(0, 0L, 0, 0L, 0, 0L, 0);
}

static Builder builder() {
public static Builder builder() {
return new Builder();
}

Expand Down Expand Up @@ -115,7 +115,7 @@ long eqDeleteRecordCount() {
return eqDeleteRecordCount;
}

public int commitCount() {
int commitCount() {
return commitCount;
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public int hashCode() {
commitCount);
}

static class Builder {
public static class Builder {
private int dataFileCount = 0;
private long dataFileSizeInBytes = 0L;
private int posDeleteFileCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class TableMaintenanceMetrics {
public static final String FAILED_TASK_COUNTER = "failedTasks";
public static final String LAST_RUN_DURATION_MS = "lastRunDurationMs";

// DeleteFiles metrics
public static final String DELETE_FILE_FAILED_COUNTER = "deleteFailed";
public static final String DELETE_FILE_SUCCEEDED_COUNTER = "deleteSucceeded";

private TableMaintenanceMetrics() {
// do not instantiate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

@Internal
class Trigger {
public class Trigger {
private final long timestamp;
private final SerializableTable table;
private final Integer taskId;
Expand All @@ -36,23 +36,23 @@ private Trigger(long timestamp, SerializableTable table, Integer taskId, boolean
this.isRecovery = isRecovery;
}

static Trigger create(long timestamp, SerializableTable table, int taskId) {
public static Trigger create(long timestamp, SerializableTable table, int taskId) {
return new Trigger(timestamp, table, taskId, false);
}

static Trigger recovery(long timestamp) {
return new Trigger(timestamp, null, null, true);
}

long timestamp() {
public long timestamp() {
return timestamp;
}

SerializableTable table() {
return table;
}

Integer taskId() {
public Integer taskId() {
return taskId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.slf4j.LoggerFactory;

@Internal
class TriggerEvaluator implements Serializable {
public class TriggerEvaluator implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(TriggerEvaluator.class);
private final List<Predicate> predicates;

Expand All @@ -50,7 +50,7 @@ boolean check(TableChange event, long lastTimeMs, long currentTimeMs) {
return result;
}

static class Builder implements Serializable {
public static class Builder implements Serializable {
private Integer dataFileCount;
private Long dataFileSizeInBytes;
private Integer posDeleteFileCount;
Expand Down Expand Up @@ -95,12 +95,12 @@ public Builder commitCount(int newCommitCount) {
return this;
}

Builder timeout(Duration newTimeout) {
public Builder timeout(Duration newTimeout) {
this.timeout = newTimeout;
return this;
}

TriggerEvaluator build() {
public TriggerEvaluator build() {
List<Predicate> predicates = Lists.newArrayList();
if (dataFileCount != null) {
predicates.add((change, unused, unused2) -> change.dataFileCount() >= dataFileCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ interface Lock {
*/
boolean isHeld();

// TODO: Fix the link to the LockRemover when we have a final name and implementation
/**
* Releases the lock. Should not fail if the lock is not held by anyone.
*
* <p>Called by LockRemover. Implementations could assume that are no concurrent calls for this
* method.
* <p>Called by {@link LockRemover}. Implementations could assume that are no concurrent calls
* for this method.
*/
void unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* the timer functions are available, but the key is not used.
*/
@Internal
class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger>
public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class);

Expand Down Expand Up @@ -89,7 +89,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger>
private transient int startsFrom = 0;
private transient boolean triggered = false;

TriggerManager(
public TriggerManager(
TableLoader tableLoader,
TriggerLockFactory lockFactory,
List<String> maintenanceTaskNames,
Expand Down
Loading

0 comments on commit 70e0783

Please sign in to comment.