From f6170cc2ea8eace4d33a685a69d24d996d767c59 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Thu, 21 Sep 2023 13:03:35 -0700 Subject: [PATCH] Flink: port table refresh (#8555) to v1.15 and v1.16 --- flink/v1.15/build.gradle | 5 + .../apache/iceberg/flink/FlinkConfParser.java | 29 ++++++ .../apache/iceberg/flink/FlinkWriteConf.java | 18 ++++ .../iceberg/flink/FlinkWriteOptions.java | 6 ++ .../org/apache/iceberg/flink/TableLoader.java | 14 +++ .../flink/sink/CachingTableSupplier.java | 91 +++++++++++++++++++ .../iceberg/flink/sink/FlinkManifestUtil.java | 18 ++-- .../apache/iceberg/flink/sink/FlinkSink.java | 33 +++++-- .../flink/sink/IcebergFilesCommitter.java | 5 +- .../flink/sink/ManifestOutputFileFactory.java | 16 ++-- .../flink/sink/RowDataTaskWriterFactory.java | 71 +++++++++++++-- .../iceberg/flink/TestFlinkConfParser.java | 61 +++++++++++++ .../apache/iceberg/flink/TestTableLoader.java | 5 + .../flink/sink/TestCachingTableSupplier.java | 81 +++++++++++++++++ .../flink/sink/TestCompressionSettings.java | 2 +- .../flink/sink/TestFlinkIcebergSink.java | 25 +++++ .../iceberg/flink/sink/TestFlinkManifest.java | 19 ++-- .../flink/sink/TestIcebergStreamWriter.java | 2 +- flink/v1.16/build.gradle | 5 + .../apache/iceberg/flink/FlinkConfParser.java | 29 ++++++ .../apache/iceberg/flink/FlinkWriteConf.java | 18 ++++ .../iceberg/flink/FlinkWriteOptions.java | 6 ++ .../org/apache/iceberg/flink/TableLoader.java | 14 +++ .../flink/sink/CachingTableSupplier.java | 91 +++++++++++++++++++ .../iceberg/flink/sink/FlinkManifestUtil.java | 18 ++-- .../apache/iceberg/flink/sink/FlinkSink.java | 33 +++++-- .../flink/sink/IcebergFilesCommitter.java | 5 +- .../flink/sink/ManifestOutputFileFactory.java | 16 ++-- .../flink/sink/RowDataTaskWriterFactory.java | 71 +++++++++++++-- .../iceberg/flink/TestFlinkConfParser.java | 61 +++++++++++++ .../apache/iceberg/flink/TestTableLoader.java | 5 + .../flink/sink/TestCachingTableSupplier.java | 81 +++++++++++++++++ .../flink/sink/TestCompressionSettings.java | 2 +- .../flink/sink/TestFlinkIcebergSink.java | 25 +++++ .../iceberg/flink/sink/TestFlinkManifest.java | 19 ++-- .../flink/sink/TestIcebergStreamWriter.java | 2 +- 36 files changed, 900 insertions(+), 102 deletions(-) create mode 100644 flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java create mode 100644 flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java create mode 100644 flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java create mode 100644 flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java diff --git a/flink/v1.15/build.gradle b/flink/v1.15/build.gradle index febc678c2bec..a77ec1b50ab3 100644 --- a/flink/v1.15/build.gradle +++ b/flink/v1.15/build.gradle @@ -114,6 +114,11 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { } testImplementation libs.awaitility + testImplementation libs.assertj.core + } + + test { + useJUnitPlatform() } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index 65717089d0d8..7167859e600c 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -18,11 +18,13 @@ */ package org.apache.iceberg.flink; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.function.Function; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.TimeUtils; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -59,6 +61,10 @@ public StringConfParser stringConf() { return new StringConfParser(); } + public DurationConfParser durationConf() { + return new DurationConfParser(); + } + class BooleanConfParser extends ConfParser { private Boolean defaultValue; @@ -180,6 +186,29 @@ public E parseOptional() { } } + class DurationConfParser extends ConfParser { + private Duration defaultValue; + + @Override + protected DurationConfParser self() { + return this; + } + + public DurationConfParser defaultValue(Duration value) { + this.defaultValue = value; + return self(); + } + + public Duration parse() { + Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); + return parse(TimeUtils::parseDuration, defaultValue); + } + + public Duration parseOptional() { + return parse(TimeUtils::parseDuration, null); + } + } + abstract class ConfParser { private final List optionNames = Lists.newArrayList(); private String tablePropertyName; diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index aba23389f2fe..f1424b0d29c6 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -18,7 +18,9 @@ */ package org.apache.iceberg.flink; +import java.time.Duration; import java.util.Map; +import org.apache.calcite.linq4j.function.Experimental; import org.apache.flink.configuration.ReadableConfig; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; @@ -184,4 +186,20 @@ public String branch() { public Integer writeParallelism() { return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional(); } + + /** + * NOTE: This may be removed or changed in a future release. This value specifies the interval for + * refreshing the table instances in sink writer subtasks. If not specified then the default + * behavior is to not refresh the table. + * + * @return the interval for refreshing the table in sink writer subtasks + */ + @Experimental + public Duration tableRefreshInterval() { + return confParser + .durationConf() + .option(FlinkWriteOptions.TABLE_REFRSH_INTERVAL.key()) + .flinkConfig(FlinkWriteOptions.TABLE_REFRSH_INTERVAL) + .parseOptional(); + } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index ba0931318e0d..ddc93f27944e 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink; +import java.time.Duration; +import org.apache.calcite.linq4j.function.Experimental; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.SnapshotRef; @@ -64,4 +66,8 @@ private FlinkWriteOptions() {} public static final ConfigOption WRITE_PARALLELISM = ConfigOptions.key("write-parallelism").intType().noDefaultValue(); + + @Experimental + public static final ConfigOption TABLE_REFRSH_INTERVAL = + ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue(); } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index f18c5ccda1f6..da509451fee7 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable, Cloneable { void open(); + boolean isOpen(); + Table loadTable(); /** Clone a TableLoader */ @@ -75,6 +77,11 @@ public void open() { tables = new HadoopTables(hadoopConf.get()); } + @Override + public boolean isOpen() { + return tables != null; + } + @Override public Table loadTable() { FlinkEnvironmentContext.init(); @@ -115,6 +122,11 @@ public void open() { catalog = catalogLoader.loadCatalog(); } + @Override + public boolean isOpen() { + return catalog != null; + } + @Override public Table loadTable() { FlinkEnvironmentContext.init(); @@ -126,6 +138,8 @@ public void close() throws IOException { if (catalog instanceof Closeable) { ((Closeable) catalog).close(); } + + catalog = null; } @Override diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java new file mode 100644 index 000000000000..e9f9786f9190 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.time.Duration; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A table loader that will only reload a table after a certain interval has passed. WARNING: This + * table loader should be used carefully when used with writer tasks. It could result in heavy load + * on a catalog for jobs with many writers. + */ +class CachingTableSupplier implements SerializableSupplier { + + private static final Logger LOG = LoggerFactory.getLogger(CachingTableSupplier.class); + + private final Table initialTable; + private final TableLoader tableLoader; + private final Duration tableRefreshInterval; + private long lastLoadTimeMillis; + private transient Table table; + + CachingTableSupplier( + SerializableTable initialTable, TableLoader tableLoader, Duration tableRefreshInterval) { + Preconditions.checkArgument(initialTable != null, "initialTable cannot be null"); + Preconditions.checkArgument(tableLoader != null, "tableLoader cannot be null"); + Preconditions.checkArgument( + tableRefreshInterval != null, "tableRefreshInterval cannot be null"); + this.initialTable = initialTable; + this.table = initialTable; + this.tableLoader = tableLoader; + this.tableRefreshInterval = tableRefreshInterval; + this.lastLoadTimeMillis = System.currentTimeMillis(); + } + + @Override + public Table get() { + if (table == null) { + this.table = initialTable; + } + return table; + } + + Table initialTable() { + return initialTable; + } + + void refreshTable() { + if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) { + try { + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + this.table = tableLoader.loadTable(); + this.lastLoadTimeMillis = System.currentTimeMillis(); + + LOG.info( + "Table {} reloaded, next min load time threshold is {}", + table.name(), + DateTimeUtil.formatTimestampMillis( + lastLoadTimeMillis + tableRefreshInterval.toMillis())); + } catch (Exception e) { + LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e); + } + } + } +} diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 00d55f937cc4..c7e8a2dea7cb 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -24,13 +24,11 @@ import java.util.function.Supplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -64,16 +62,14 @@ static List readDataFiles( } static ManifestOutputFileFactory createOutputFileFactory( - Table table, String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { - TableOperations ops = ((HasTableOperations) table).operations(); + Supplier
tableSupplier, + Map tableProps, + String flinkJobId, + String operatorUniqueId, + int subTaskId, + long attemptNumber) { return new ManifestOutputFileFactory( - ops, - table.io(), - table.properties(), - flinkJobId, - operatorUniqueId, - subTaskId, - attemptNumber); + tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, attemptNumber); } /** diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 023702790116..58828799255d 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,6 +68,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -330,7 +332,10 @@ private DataStreamSink chainIcebergOperators() { DataStream rowDataInput = inputCreator.apply(uidPrefix); if (table == null) { - tableLoader.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + try (TableLoader loader = tableLoader) { this.table = loader.loadTable(); } catch (IOException e) { @@ -462,8 +467,19 @@ private SingleOutputStreamOperator appendWriter( } } + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval(); + + SerializableSupplier
tableSupplier; + if (tableRefreshInterval != null) { + tableSupplier = + new CachingTableSupplier(serializableTable, tableLoader, tableRefreshInterval); + } else { + tableSupplier = () -> serializableTable; + } + IcebergStreamWriter streamWriter = - createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds); + createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds); int parallelism = flinkWriteConf.writeParallelism() == null @@ -580,24 +596,25 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { } static IcebergStreamWriter createStreamWriter( - Table table, + SerializableSupplier
tableSupplier, FlinkWriteConf flinkWriteConf, RowType flinkRowType, List equalityFieldIds) { - Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); + Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null"); - Table serializableTable = SerializableTable.copyOf(table); + Table initTable = tableSupplier.get(); FileFormat format = flinkWriteConf.dataFileFormat(); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( - serializableTable, + tableSupplier, flinkRowType, flinkWriteConf.targetDataFileSize(), format, - writeProperties(table, format, flinkWriteConf), + writeProperties(initTable, format, flinkWriteConf), equalityFieldIds, flinkWriteConf.upsertMode()); - return new IcebergStreamWriter<>(table.name(), taskWriterFactory); + + return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory); } /** diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 3805ab298428..b9bceaa9311d 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -160,7 +160,7 @@ public void initializeState(StateInitializationContext context) throws Exception int attemptId = getRuntimeContext().getAttemptNumber(); this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory( - table, flinkJobId, operatorUniqueId, subTaskId, attemptId); + () -> table, table.properties(), flinkJobId, operatorUniqueId, subTaskId, attemptId); this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); @@ -247,6 +247,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { checkpointId, maxCommittedCheckpointId); } + + // reload the table in case new configuration is needed + this.table = tableLoader.loadTable(); } private void commitUpToCheckpoint( diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index 045e45a4ceae..da5e6e7627ae 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -20,9 +20,11 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -31,8 +33,7 @@ class ManifestOutputFileFactory { // properties. static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location"; - private final TableOperations ops; - private final FileIO io; + private final Supplier
tableSupplier; private final Map props; private final String flinkJobId; private final String operatorUniqueId; @@ -41,15 +42,13 @@ class ManifestOutputFileFactory { private final AtomicInteger fileCount = new AtomicInteger(0); ManifestOutputFileFactory( - TableOperations ops, - FileIO io, + Supplier
tableSupplier, Map props, String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { - this.ops = ops; - this.io = io; + this.tableSupplier = tableSupplier; this.props = props; this.flinkJobId = flinkJobId; this.operatorUniqueId = operatorUniqueId; @@ -71,6 +70,7 @@ private String generatePath(long checkpointId) { OutputFile create(long checkpointId) { String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION); + TableOperations ops = ((HasTableOperations) tableSupplier.get()).operations(); String newManifestFullPath; if (Strings.isNullOrEmpty(flinkManifestDir)) { @@ -81,7 +81,7 @@ OutputFile create(long checkpointId) { String.format("%s/%s", stripTrailingSlash(flinkManifestDir), generatePath(checkpointId)); } - return io.newOutputFile(newManifestFullPath); + return tableSupplier.get().io().newOutputFile(newManifestFullPath); } private static String stripTrailingSlash(String path) { diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index c624eb3f0276..67422a1afeb1 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -38,13 +39,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.SerializableSupplier; public class RowDataTaskWriterFactory implements TaskWriterFactory { - private final Table table; + private final Supplier
tableSupplier; private final Schema schema; private final RowType flinkSchema; private final PartitionSpec spec; - private final FileIO io; private final long targetFileSizeBytes; private final FileFormat format; private final List equalityFieldIds; @@ -61,11 +62,37 @@ public RowDataTaskWriterFactory( Map writeProperties, List equalityFieldIds, boolean upsert) { - this.table = table; + this( + () -> table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsert); + } + + public RowDataTaskWriterFactory( + SerializableSupplier
tableSupplier, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + Map writeProperties, + List equalityFieldIds, + boolean upsert) { + this.tableSupplier = tableSupplier; + + Table table; + if (tableSupplier instanceof CachingTableSupplier) { + // rely on the initial table metadata for schema, etc., until schema evolution is supported + table = ((CachingTableSupplier) tableSupplier).initialTable(); + } else { + table = tableSupplier.get(); + } + this.schema = table.schema(); this.flinkSchema = flinkSchema; this.spec = table.spec(); - this.io = table.io(); this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; this.equalityFieldIds = equalityFieldIds; @@ -107,8 +134,21 @@ public RowDataTaskWriterFactory( @Override public void initialize(int taskId, int attemptId) { + Table table; + if (tableSupplier instanceof CachingTableSupplier) { + // rely on the initial table metadata for schema, etc., until schema evolution is supported + table = ((CachingTableSupplier) tableSupplier).initialTable(); + } else { + table = tableSupplier.get(); + } + + refreshTable(); + this.outputFileFactory = - OutputFileFactory.builderFor(table, taskId, attemptId).format(format).build(); + OutputFileFactory.builderFor(table, taskId, attemptId) + .format(format) + .ioSupplier(() -> tableSupplier.get().io()) + .build(); } @Override @@ -117,18 +157,25 @@ public TaskWriter create() { outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); + refreshTable(); + if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { // Initialize a task writer to write INSERT only. if (spec.isUnpartitioned()) { return new UnpartitionedWriter<>( - spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); + spec, + format, + appenderFactory, + outputFileFactory, + tableSupplier.get().io(), + targetFileSizeBytes); } else { return new RowDataPartitionedFanoutWriter( spec, format, appenderFactory, outputFileFactory, - io, + tableSupplier.get().io(), targetFileSizeBytes, schema, flinkSchema); @@ -141,7 +188,7 @@ public TaskWriter create() { format, appenderFactory, outputFileFactory, - io, + tableSupplier.get().io(), targetFileSizeBytes, schema, flinkSchema, @@ -153,7 +200,7 @@ public TaskWriter create() { format, appenderFactory, outputFileFactory, - io, + tableSupplier.get().io(), targetFileSizeBytes, schema, flinkSchema, @@ -163,6 +210,12 @@ public TaskWriter create() { } } + void refreshTable() { + if (tableSupplier instanceof CachingTableSupplier) { + ((CachingTableSupplier) tableSupplier).refreshTable(); + } + } + private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java new file mode 100644 index 000000000000..4b6ac25ab8e3 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +public class TestFlinkConfParser { + + @Test + public void testDurationConf() { + Map writeOptions = ImmutableMap.of("write-prop", "111s"); + + ConfigOption configOption = + ConfigOptions.key("conf-prop").durationType().noDefaultValue(); + Configuration flinkConf = new Configuration(); + flinkConf.setString(configOption.key(), "222s"); + + Table table = mock(Table.class); + when(table.properties()).thenReturn(ImmutableMap.of("table-prop", "333s")); + + FlinkConfParser confParser = new FlinkConfParser(table, writeOptions, flinkConf); + Duration defaultVal = Duration.ofMillis(999); + + Duration result = + confParser.durationConf().option("write-prop").defaultValue(defaultVal).parse(); + assertThat(result).isEqualTo(Duration.ofSeconds(111)); + + result = confParser.durationConf().flinkConfig(configOption).defaultValue(defaultVal).parse(); + assertThat(result).isEqualTo(Duration.ofSeconds(222)); + + result = confParser.durationConf().tableProperty("table-prop").defaultValue(defaultVal).parse(); + assertThat(result).isEqualTo(Duration.ofSeconds(333)); + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java index 93e97d5aa3bc..4ad302dde436 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java @@ -36,6 +36,11 @@ public TestTableLoader(String dir) { @Override public void open() {} + @Override + public boolean isOpen() { + return true; + } + @Override public Table loadTable() { return TestTables.load(dir, "test"); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java new file mode 100644 index 000000000000..360db658cd2f --- /dev/null +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +public class TestCachingTableSupplier { + + @Test + public void testCheckArguments() { + SerializableTable initialTable = mock(SerializableTable.class); + + Table loadedTable = mock(Table.class); + TableLoader tableLoader = mock(TableLoader.class); + when(tableLoader.loadTable()).thenReturn(loadedTable); + + new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100)); + + assertThatThrownBy(() -> new CachingTableSupplier(initialTable, tableLoader, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("tableRefreshInterval cannot be null"); + assertThatThrownBy(() -> new CachingTableSupplier(null, tableLoader, Duration.ofMillis(100))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("initialTable cannot be null"); + assertThatThrownBy(() -> new CachingTableSupplier(initialTable, null, Duration.ofMillis(100))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("tableLoader cannot be null"); + } + + @Test + public void testTableReload() { + SerializableTable initialTable = mock(SerializableTable.class); + + Table loadedTable = mock(Table.class); + TableLoader tableLoader = mock(TableLoader.class); + when(tableLoader.loadTable()).thenReturn(loadedTable); + + CachingTableSupplier cachingTableSupplier = + new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100)); + + // refresh shouldn't do anything as the min reload interval hasn't passed + cachingTableSupplier.refreshTable(); + assertThat(cachingTableSupplier.get()).isEqualTo(initialTable); + + // refresh after waiting past the min reload interval + Awaitility.await() + .atLeast(100, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + cachingTableSupplier.refreshTable(); + assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable); + }); + } +} diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 4d174866ca68..d9d57fb7107e 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -215,7 +215,7 @@ private static OneInputStreamOperatorTestHarness createIce icebergTable, override, new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java index 23beb19a72f2..d0c394e35520 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; @@ -390,4 +391,28 @@ public void testOverrideWriteConfigWithUnknownFileFormat() { return null; }); } + + @Test + public void testWriteRowWithTableRefreshInterval() throws Exception { + List rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo")); + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO) + .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + + Configuration flinkConf = new Configuration(); + flinkConf.setString(FlinkWriteOptions.TABLE_REFRSH_INTERVAL.key(), "100ms"); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .flinkConf(flinkConf) + .writeParallelism(parallelism) + .append(); + + // Execute the program. + env.execute("Test Iceberg DataStream"); + + // Assert the iceberg table's records. + SimpleDataUtil.assertTableRows(table, convertToRowData(rows)); + } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index f171485a90f7..ce1f208a4b07 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -34,7 +34,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Table; @@ -94,7 +93,8 @@ public void testIO() throws IOException { String operatorId = newOperatorUniqueId(); for (long checkpointId = 1; checkpointId <= 3; checkpointId++) { ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); + FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, 1, 1); final long curCkpId = checkpointId; List dataFiles = generateDataFiles(10); @@ -135,14 +135,7 @@ public void testUserProvidedManifestLocation() throws IOException { Map props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///"); ManifestOutputFileFactory factory = - new ManifestOutputFileFactory( - ((HasTableOperations) table).operations(), - table.io(), - props, - flinkJobId, - operatorId, - 1, - 1); + new ManifestOutputFileFactory(() -> table, props, flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(5); DeltaManifests deltaManifests = @@ -177,7 +170,8 @@ public void testVersionedSerializer() throws IOException { String flinkJobId = newFlinkJobId(); String operatorId = newOperatorUniqueId(); ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); + FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(10); List eqDeleteFiles = generateEqDeleteFiles(10); @@ -214,7 +208,8 @@ public void testCompatibility() throws IOException { String flinkJobId = newFlinkJobId(); String operatorId = newOperatorUniqueId(); ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); + FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(10); ManifestFile manifest = diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index fa69c5d4d1fd..0968f89f55e0 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -376,7 +376,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr icebergTable, Maps.newHashMap(), new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.16/build.gradle b/flink/v1.16/build.gradle index bdddc8bf3e74..2d5e6f326b01 100644 --- a/flink/v1.16/build.gradle +++ b/flink/v1.16/build.gradle @@ -114,6 +114,11 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { } testImplementation libs.awaitility + testImplementation libs.assertj.core + } + + test { + useJUnitPlatform() } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java index 65717089d0d8..7167859e600c 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java @@ -18,11 +18,13 @@ */ package org.apache.iceberg.flink; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.function.Function; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.TimeUtils; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -59,6 +61,10 @@ public StringConfParser stringConf() { return new StringConfParser(); } + public DurationConfParser durationConf() { + return new DurationConfParser(); + } + class BooleanConfParser extends ConfParser { private Boolean defaultValue; @@ -180,6 +186,29 @@ public E parseOptional() { } } + class DurationConfParser extends ConfParser { + private Duration defaultValue; + + @Override + protected DurationConfParser self() { + return this; + } + + public DurationConfParser defaultValue(Duration value) { + this.defaultValue = value; + return self(); + } + + public Duration parse() { + Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); + return parse(TimeUtils::parseDuration, defaultValue); + } + + public Duration parseOptional() { + return parse(TimeUtils::parseDuration, null); + } + } + abstract class ConfParser { private final List optionNames = Lists.newArrayList(); private String tablePropertyName; diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index aba23389f2fe..f1424b0d29c6 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -18,7 +18,9 @@ */ package org.apache.iceberg.flink; +import java.time.Duration; import java.util.Map; +import org.apache.calcite.linq4j.function.Experimental; import org.apache.flink.configuration.ReadableConfig; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; @@ -184,4 +186,20 @@ public String branch() { public Integer writeParallelism() { return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional(); } + + /** + * NOTE: This may be removed or changed in a future release. This value specifies the interval for + * refreshing the table instances in sink writer subtasks. If not specified then the default + * behavior is to not refresh the table. + * + * @return the interval for refreshing the table in sink writer subtasks + */ + @Experimental + public Duration tableRefreshInterval() { + return confParser + .durationConf() + .option(FlinkWriteOptions.TABLE_REFRSH_INTERVAL.key()) + .flinkConfig(FlinkWriteOptions.TABLE_REFRSH_INTERVAL) + .parseOptional(); + } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index ba0931318e0d..ddc93f27944e 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink; +import java.time.Duration; +import org.apache.calcite.linq4j.function.Experimental; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.SnapshotRef; @@ -64,4 +66,8 @@ private FlinkWriteOptions() {} public static final ConfigOption WRITE_PARALLELISM = ConfigOptions.key("write-parallelism").intType().noDefaultValue(); + + @Experimental + public static final ConfigOption TABLE_REFRSH_INTERVAL = + ConfigOptions.key("table-refresh-interval").durationType().noDefaultValue(); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index f18c5ccda1f6..da509451fee7 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable, Cloneable { void open(); + boolean isOpen(); + Table loadTable(); /** Clone a TableLoader */ @@ -75,6 +77,11 @@ public void open() { tables = new HadoopTables(hadoopConf.get()); } + @Override + public boolean isOpen() { + return tables != null; + } + @Override public Table loadTable() { FlinkEnvironmentContext.init(); @@ -115,6 +122,11 @@ public void open() { catalog = catalogLoader.loadCatalog(); } + @Override + public boolean isOpen() { + return catalog != null; + } + @Override public Table loadTable() { FlinkEnvironmentContext.init(); @@ -126,6 +138,8 @@ public void close() throws IOException { if (catalog instanceof Closeable) { ((Closeable) catalog).close(); } + + catalog = null; } @Override diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java new file mode 100644 index 000000000000..e9f9786f9190 --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.time.Duration; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A table loader that will only reload a table after a certain interval has passed. WARNING: This + * table loader should be used carefully when used with writer tasks. It could result in heavy load + * on a catalog for jobs with many writers. + */ +class CachingTableSupplier implements SerializableSupplier
{ + + private static final Logger LOG = LoggerFactory.getLogger(CachingTableSupplier.class); + + private final Table initialTable; + private final TableLoader tableLoader; + private final Duration tableRefreshInterval; + private long lastLoadTimeMillis; + private transient Table table; + + CachingTableSupplier( + SerializableTable initialTable, TableLoader tableLoader, Duration tableRefreshInterval) { + Preconditions.checkArgument(initialTable != null, "initialTable cannot be null"); + Preconditions.checkArgument(tableLoader != null, "tableLoader cannot be null"); + Preconditions.checkArgument( + tableRefreshInterval != null, "tableRefreshInterval cannot be null"); + this.initialTable = initialTable; + this.table = initialTable; + this.tableLoader = tableLoader; + this.tableRefreshInterval = tableRefreshInterval; + this.lastLoadTimeMillis = System.currentTimeMillis(); + } + + @Override + public Table get() { + if (table == null) { + this.table = initialTable; + } + return table; + } + + Table initialTable() { + return initialTable; + } + + void refreshTable() { + if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) { + try { + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + this.table = tableLoader.loadTable(); + this.lastLoadTimeMillis = System.currentTimeMillis(); + + LOG.info( + "Table {} reloaded, next min load time threshold is {}", + table.name(), + DateTimeUtil.formatTimestampMillis( + lastLoadTimeMillis + tableRefreshInterval.toMillis())); + } catch (Exception e) { + LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e); + } + } + } +} diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 00d55f937cc4..c7e8a2dea7cb 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -24,13 +24,11 @@ import java.util.function.Supplier; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -64,16 +62,14 @@ static List readDataFiles( } static ManifestOutputFileFactory createOutputFileFactory( - Table table, String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { - TableOperations ops = ((HasTableOperations) table).operations(); + Supplier
tableSupplier, + Map tableProps, + String flinkJobId, + String operatorUniqueId, + int subTaskId, + long attemptNumber) { return new ManifestOutputFileFactory( - ops, - table.io(), - table.properties(), - flinkJobId, - operatorUniqueId, - subTaskId, - attemptNumber); + tableSupplier, tableProps, flinkJobId, operatorUniqueId, subTaskId, attemptNumber); } /** diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 023702790116..58828799255d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,6 +68,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -330,7 +332,10 @@ private DataStreamSink chainIcebergOperators() { DataStream rowDataInput = inputCreator.apply(uidPrefix); if (table == null) { - tableLoader.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + try (TableLoader loader = tableLoader) { this.table = loader.loadTable(); } catch (IOException e) { @@ -462,8 +467,19 @@ private SingleOutputStreamOperator appendWriter( } } + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + Duration tableRefreshInterval = flinkWriteConf.tableRefreshInterval(); + + SerializableSupplier
tableSupplier; + if (tableRefreshInterval != null) { + tableSupplier = + new CachingTableSupplier(serializableTable, tableLoader, tableRefreshInterval); + } else { + tableSupplier = () -> serializableTable; + } + IcebergStreamWriter streamWriter = - createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds); + createStreamWriter(tableSupplier, flinkWriteConf, flinkRowType, equalityFieldIds); int parallelism = flinkWriteConf.writeParallelism() == null @@ -580,24 +596,25 @@ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { } static IcebergStreamWriter createStreamWriter( - Table table, + SerializableSupplier
tableSupplier, FlinkWriteConf flinkWriteConf, RowType flinkRowType, List equalityFieldIds) { - Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); + Preconditions.checkArgument(tableSupplier != null, "Iceberg table supplier shouldn't be null"); - Table serializableTable = SerializableTable.copyOf(table); + Table initTable = tableSupplier.get(); FileFormat format = flinkWriteConf.dataFileFormat(); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( - serializableTable, + tableSupplier, flinkRowType, flinkWriteConf.targetDataFileSize(), format, - writeProperties(table, format, flinkWriteConf), + writeProperties(initTable, format, flinkWriteConf), equalityFieldIds, flinkWriteConf.upsertMode()); - return new IcebergStreamWriter<>(table.name(), taskWriterFactory); + + return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory); } /** diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 3805ab298428..b9bceaa9311d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -160,7 +160,7 @@ public void initializeState(StateInitializationContext context) throws Exception int attemptId = getRuntimeContext().getAttemptNumber(); this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory( - table, flinkJobId, operatorUniqueId, subTaskId, attemptId); + () -> table, table.properties(), flinkJobId, operatorUniqueId, subTaskId, attemptId); this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); @@ -247,6 +247,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { checkpointId, maxCommittedCheckpointId); } + + // reload the table in case new configuration is needed + this.table = tableLoader.loadTable(); } private void commitUpToCheckpoint( diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index 045e45a4ceae..da5e6e7627ae 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -20,9 +20,11 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -31,8 +33,7 @@ class ManifestOutputFileFactory { // properties. static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location"; - private final TableOperations ops; - private final FileIO io; + private final Supplier
tableSupplier; private final Map props; private final String flinkJobId; private final String operatorUniqueId; @@ -41,15 +42,13 @@ class ManifestOutputFileFactory { private final AtomicInteger fileCount = new AtomicInteger(0); ManifestOutputFileFactory( - TableOperations ops, - FileIO io, + Supplier
tableSupplier, Map props, String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { - this.ops = ops; - this.io = io; + this.tableSupplier = tableSupplier; this.props = props; this.flinkJobId = flinkJobId; this.operatorUniqueId = operatorUniqueId; @@ -71,6 +70,7 @@ private String generatePath(long checkpointId) { OutputFile create(long checkpointId) { String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION); + TableOperations ops = ((HasTableOperations) tableSupplier.get()).operations(); String newManifestFullPath; if (Strings.isNullOrEmpty(flinkManifestDir)) { @@ -81,7 +81,7 @@ OutputFile create(long checkpointId) { String.format("%s/%s", stripTrailingSlash(flinkManifestDir), generatePath(checkpointId)); } - return io.newOutputFile(newManifestFullPath); + return tableSupplier.get().io().newOutputFile(newManifestFullPath); } private static String stripTrailingSlash(String path) { diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index c624eb3f0276..67422a1afeb1 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; @@ -38,13 +39,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.SerializableSupplier; public class RowDataTaskWriterFactory implements TaskWriterFactory { - private final Table table; + private final Supplier
tableSupplier; private final Schema schema; private final RowType flinkSchema; private final PartitionSpec spec; - private final FileIO io; private final long targetFileSizeBytes; private final FileFormat format; private final List equalityFieldIds; @@ -61,11 +62,37 @@ public RowDataTaskWriterFactory( Map writeProperties, List equalityFieldIds, boolean upsert) { - this.table = table; + this( + () -> table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsert); + } + + public RowDataTaskWriterFactory( + SerializableSupplier
tableSupplier, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + Map writeProperties, + List equalityFieldIds, + boolean upsert) { + this.tableSupplier = tableSupplier; + + Table table; + if (tableSupplier instanceof CachingTableSupplier) { + // rely on the initial table metadata for schema, etc., until schema evolution is supported + table = ((CachingTableSupplier) tableSupplier).initialTable(); + } else { + table = tableSupplier.get(); + } + this.schema = table.schema(); this.flinkSchema = flinkSchema; this.spec = table.spec(); - this.io = table.io(); this.targetFileSizeBytes = targetFileSizeBytes; this.format = format; this.equalityFieldIds = equalityFieldIds; @@ -107,8 +134,21 @@ public RowDataTaskWriterFactory( @Override public void initialize(int taskId, int attemptId) { + Table table; + if (tableSupplier instanceof CachingTableSupplier) { + // rely on the initial table metadata for schema, etc., until schema evolution is supported + table = ((CachingTableSupplier) tableSupplier).initialTable(); + } else { + table = tableSupplier.get(); + } + + refreshTable(); + this.outputFileFactory = - OutputFileFactory.builderFor(table, taskId, attemptId).format(format).build(); + OutputFileFactory.builderFor(table, taskId, attemptId) + .format(format) + .ioSupplier(() -> tableSupplier.get().io()) + .build(); } @Override @@ -117,18 +157,25 @@ public TaskWriter create() { outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); + refreshTable(); + if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { // Initialize a task writer to write INSERT only. if (spec.isUnpartitioned()) { return new UnpartitionedWriter<>( - spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes); + spec, + format, + appenderFactory, + outputFileFactory, + tableSupplier.get().io(), + targetFileSizeBytes); } else { return new RowDataPartitionedFanoutWriter( spec, format, appenderFactory, outputFileFactory, - io, + tableSupplier.get().io(), targetFileSizeBytes, schema, flinkSchema); @@ -141,7 +188,7 @@ public TaskWriter create() { format, appenderFactory, outputFileFactory, - io, + tableSupplier.get().io(), targetFileSizeBytes, schema, flinkSchema, @@ -153,7 +200,7 @@ public TaskWriter create() { format, appenderFactory, outputFileFactory, - io, + tableSupplier.get().io(), targetFileSizeBytes, schema, flinkSchema, @@ -163,6 +210,12 @@ public TaskWriter create() { } } + void refreshTable() { + if (tableSupplier instanceof CachingTableSupplier) { + ((CachingTableSupplier) tableSupplier).refreshTable(); + } + } + private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java new file mode 100644 index 000000000000..4b6ac25ab8e3 --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkConfParser.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +public class TestFlinkConfParser { + + @Test + public void testDurationConf() { + Map writeOptions = ImmutableMap.of("write-prop", "111s"); + + ConfigOption configOption = + ConfigOptions.key("conf-prop").durationType().noDefaultValue(); + Configuration flinkConf = new Configuration(); + flinkConf.setString(configOption.key(), "222s"); + + Table table = mock(Table.class); + when(table.properties()).thenReturn(ImmutableMap.of("table-prop", "333s")); + + FlinkConfParser confParser = new FlinkConfParser(table, writeOptions, flinkConf); + Duration defaultVal = Duration.ofMillis(999); + + Duration result = + confParser.durationConf().option("write-prop").defaultValue(defaultVal).parse(); + assertThat(result).isEqualTo(Duration.ofSeconds(111)); + + result = confParser.durationConf().flinkConfig(configOption).defaultValue(defaultVal).parse(); + assertThat(result).isEqualTo(Duration.ofSeconds(222)); + + result = confParser.durationConf().tableProperty("table-prop").defaultValue(defaultVal).parse(); + assertThat(result).isEqualTo(Duration.ofSeconds(333)); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java index 93e97d5aa3bc..4ad302dde436 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java @@ -36,6 +36,11 @@ public TestTableLoader(String dir) { @Override public void open() {} + @Override + public boolean isOpen() { + return true; + } + @Override public Table loadTable() { return TestTables.load(dir, "test"); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java new file mode 100644 index 000000000000..360db658cd2f --- /dev/null +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +public class TestCachingTableSupplier { + + @Test + public void testCheckArguments() { + SerializableTable initialTable = mock(SerializableTable.class); + + Table loadedTable = mock(Table.class); + TableLoader tableLoader = mock(TableLoader.class); + when(tableLoader.loadTable()).thenReturn(loadedTable); + + new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100)); + + assertThatThrownBy(() -> new CachingTableSupplier(initialTable, tableLoader, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("tableRefreshInterval cannot be null"); + assertThatThrownBy(() -> new CachingTableSupplier(null, tableLoader, Duration.ofMillis(100))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("initialTable cannot be null"); + assertThatThrownBy(() -> new CachingTableSupplier(initialTable, null, Duration.ofMillis(100))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("tableLoader cannot be null"); + } + + @Test + public void testTableReload() { + SerializableTable initialTable = mock(SerializableTable.class); + + Table loadedTable = mock(Table.class); + TableLoader tableLoader = mock(TableLoader.class); + when(tableLoader.loadTable()).thenReturn(loadedTable); + + CachingTableSupplier cachingTableSupplier = + new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100)); + + // refresh shouldn't do anything as the min reload interval hasn't passed + cachingTableSupplier.refreshTable(); + assertThat(cachingTableSupplier.get()).isEqualTo(initialTable); + + // refresh after waiting past the min reload interval + Awaitility.await() + .atLeast(100, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + cachingTableSupplier.refreshTable(); + assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable); + }); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java index 3f1172a19cc0..214c3de1063e 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java @@ -215,7 +215,7 @@ private static OneInputStreamOperatorTestHarness createIce icebergTable, override, new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0); diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java index 23beb19a72f2..d0c394e35520 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; @@ -390,4 +391,28 @@ public void testOverrideWriteConfigWithUnknownFileFormat() { return null; }); } + + @Test + public void testWriteRowWithTableRefreshInterval() throws Exception { + List rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo")); + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO) + .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + + Configuration flinkConf = new Configuration(); + flinkConf.setString(FlinkWriteOptions.TABLE_REFRSH_INTERVAL.key(), "100ms"); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .flinkConf(flinkConf) + .writeParallelism(parallelism) + .append(); + + // Execute the program. + env.execute("Test Iceberg DataStream"); + + // Assert the iceberg table's records. + SimpleDataUtil.assertTableRows(table, convertToRowData(rows)); + } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index f171485a90f7..ce1f208a4b07 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -34,7 +34,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Table; @@ -94,7 +93,8 @@ public void testIO() throws IOException { String operatorId = newOperatorUniqueId(); for (long checkpointId = 1; checkpointId <= 3; checkpointId++) { ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); + FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, 1, 1); final long curCkpId = checkpointId; List dataFiles = generateDataFiles(10); @@ -135,14 +135,7 @@ public void testUserProvidedManifestLocation() throws IOException { Map props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///"); ManifestOutputFileFactory factory = - new ManifestOutputFileFactory( - ((HasTableOperations) table).operations(), - table.io(), - props, - flinkJobId, - operatorId, - 1, - 1); + new ManifestOutputFileFactory(() -> table, props, flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(5); DeltaManifests deltaManifests = @@ -177,7 +170,8 @@ public void testVersionedSerializer() throws IOException { String flinkJobId = newFlinkJobId(); String operatorId = newOperatorUniqueId(); ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); + FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(10); List eqDeleteFiles = generateEqDeleteFiles(10); @@ -214,7 +208,8 @@ public void testCompatibility() throws IOException { String flinkJobId = newFlinkJobId(); String operatorId = newOperatorUniqueId(); ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); + FlinkManifestUtil.createOutputFileFactory( + () -> table, table.properties(), flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(10); ManifestFile manifest = diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java index fa69c5d4d1fd..0968f89f55e0 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java @@ -376,7 +376,7 @@ private OneInputStreamOperatorTestHarness createIcebergStr icebergTable, Maps.newHashMap(), new org.apache.flink.configuration.Configuration()); IcebergStreamWriter streamWriter = - FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null); + FlinkSink.createStreamWriter(() -> icebergTable, flinkWriteConfig, flinkRowType, null); OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0);