Skip to content

Commit

Permalink
Flink: port table refresh (#8555) to v1.15 and v1.16
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Sep 21, 2023
1 parent d213b16 commit f6170cc
Show file tree
Hide file tree
Showing 36 changed files with 900 additions and 102 deletions.
5 changes: 5 additions & 0 deletions flink/v1.15/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
}

testImplementation libs.awaitility
testImplementation libs.assertj.core
}

test {
useJUnitPlatform()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.iceberg.flink;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -59,6 +61,10 @@ public StringConfParser stringConf() {
return new StringConfParser();
}

public DurationConfParser durationConf() {
return new DurationConfParser();
}

class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
private Boolean defaultValue;

Expand Down Expand Up @@ -180,6 +186,29 @@ public E parseOptional() {
}
}

class DurationConfParser extends ConfParser<DurationConfParser, Duration> {
private Duration defaultValue;

@Override
protected DurationConfParser self() {
return this;
}

public DurationConfParser defaultValue(Duration value) {
this.defaultValue = value;
return self();
}

public Duration parse() {
Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
return parse(TimeUtils::parseDuration, defaultValue);
}

public Duration parseOptional() {
return parse(TimeUtils::parseDuration, null);
}
}

abstract class ConfParser<ThisT, T> {
private final List<String> optionNames = Lists.newArrayList();
private String tablePropertyName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.iceberg.flink;

import java.time.Duration;
import java.util.Map;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -184,4 +186,20 @@ public String branch() {
public Integer writeParallelism() {
return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
}

/**
* NOTE: This may be removed or changed in a future release. This value specifies the interval for
* refreshing the table instances in sink writer subtasks. If not specified then the default
* behavior is to not refresh the table.
*
* @return the interval for refreshing the table in sink writer subtasks
*/
@Experimental
public Duration tableRefreshInterval() {
return confParser
.durationConf()
.option(FlinkWriteOptions.TABLE_REFRSH_INTERVAL.key())
.flinkConfig(FlinkWriteOptions.TABLE_REFRSH_INTERVAL)
.parseOptional();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;

import java.time.Duration;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;
Expand Down Expand Up @@ -64,4 +66,8 @@ private FlinkWriteOptions() {}

public static final ConfigOption<Integer> WRITE_PARALLELISM =
ConfigOptions.key("write-parallelism").intType().noDefaultValue();

@Experimental
public static final ConfigOption<Duration> TABLE_REFRSH_INTERVAL =
ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable, Cloneable {

void open();

boolean isOpen();

Table loadTable();

/** Clone a TableLoader */
Expand Down Expand Up @@ -75,6 +77,11 @@ public void open() {
tables = new HadoopTables(hadoopConf.get());
}

@Override
public boolean isOpen() {
return tables != null;
}

@Override
public Table loadTable() {
FlinkEnvironmentContext.init();
Expand Down Expand Up @@ -115,6 +122,11 @@ public void open() {
catalog = catalogLoader.loadCatalog();
}

@Override
public boolean isOpen() {
return catalog != null;
}

@Override
public Table loadTable() {
FlinkEnvironmentContext.init();
Expand All @@ -126,6 +138,8 @@ public void close() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}

catalog = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.time.Duration;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A table loader that will only reload a table after a certain interval has passed. WARNING: This
* table loader should be used carefully when used with writer tasks. It could result in heavy load
* on a catalog for jobs with many writers.
*/
class CachingTableSupplier implements SerializableSupplier<Table> {

private static final Logger LOG = LoggerFactory.getLogger(CachingTableSupplier.class);

private final Table initialTable;
private final TableLoader tableLoader;
private final Duration tableRefreshInterval;
private long lastLoadTimeMillis;
private transient Table table;

CachingTableSupplier(
SerializableTable initialTable, TableLoader tableLoader, Duration tableRefreshInterval) {
Preconditions.checkArgument(initialTable != null, "initialTable cannot be null");
Preconditions.checkArgument(tableLoader != null, "tableLoader cannot be null");
Preconditions.checkArgument(
tableRefreshInterval != null, "tableRefreshInterval cannot be null");
this.initialTable = initialTable;
this.table = initialTable;
this.tableLoader = tableLoader;
this.tableRefreshInterval = tableRefreshInterval;
this.lastLoadTimeMillis = System.currentTimeMillis();
}

@Override
public Table get() {
if (table == null) {
this.table = initialTable;
}
return table;
}

Table initialTable() {
return initialTable;
}

void refreshTable() {
if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) {
try {
if (!tableLoader.isOpen()) {
tableLoader.open();
}

this.table = tableLoader.loadTable();
this.lastLoadTimeMillis = System.currentTimeMillis();

LOG.info(
"Table {} reloaded, next min load time threshold is {}",
table.name(),
DateTimeUtil.formatTimestampMillis(
lastLoadTimeMillis + tableRefreshInterval.toMillis()));
} catch (Exception e) {
LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import java.util.function.Supplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -64,16 +62,14 @@ static List<DataFile> readDataFiles(
}

static ManifestOutputFileFactory createOutputFileFactory(
Table table, String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) {
TableOperations ops = ((HasTableOperations) table).operations();
Supplier<Table> tableSupplier,
Map<String, String> tableProps,
String flinkJobId,
String operatorUniqueId,
int subTaskId,
long attemptNumber) {
return new ManifestOutputFileFactory(
ops,
table.io(),
table.properties(),
flinkJobId,
operatorUniqueId,
subTaskId,
attemptNumber);
tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, attemptNumber);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -330,7 +332,10 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);

if (table == null) {
tableLoader.open();
if (!tableLoader.isOpen()) {
tableLoader.open();
}

try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
Expand Down Expand Up @@ -462,8 +467,19 @@ private SingleOutputStreamOperator<WriteResult> appendWriter(
}
}

SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval();

SerializableSupplier<Table> tableSupplier;
if (tableRefreshInterval != null) {
tableSupplier =
new CachingTableSupplier(serializableTable, tableLoader, tableRefreshInterval);
} else {
tableSupplier = () -> serializableTable;
}

IcebergStreamWriter<RowData> streamWriter =
createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);
createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds);

int parallelism =
flinkWriteConf.writeParallelism() == null
Expand Down Expand Up @@ -580,24 +596,25 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
}

static IcebergStreamWriter<RowData> createStreamWriter(
Table table,
SerializableSupplier<Table> tableSupplier,
FlinkWriteConf flinkWriteConf,
RowType flinkRowType,
List<Integer> equalityFieldIds) {
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");
Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null");

Table serializableTable = SerializableTable.copyOf(table);
Table initTable = tableSupplier.get();
FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
tableSupplier,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
format,
writeProperties(table, format, flinkWriteConf),
writeProperties(initTable, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);

return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void initializeState(StateInitializationContext context) throws Exception
int attemptId = getRuntimeContext().getAttemptNumber();
this.manifestOutputFileFactory =
FlinkManifestUtil.createOutputFileFactory(
table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
() -> table, table.properties(), flinkJobId, operatorUniqueId, subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
Expand Down Expand Up @@ -247,6 +247,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
checkpointId,
maxCommittedCheckpointId);
}

// reload the table in case new configuration is needed
this.table = tableLoader.loadTable();
}

private void commitUpToCheckpoint(
Expand Down
Loading

0 comments on commit f6170cc

Please sign in to comment.