From a1e4c52e8191139f1af022ada0cb8046306dc118 Mon Sep 17 00:00:00 2001 From: Rodrigo Meneses Date: Mon, 6 May 2024 11:01:09 -0700 Subject: [PATCH] A new implementation of an Iceberg Sink [WIP] thta will be used with upcoming Flink Compaction jobs --- .../flink/sink/BaseDeltaTaskWriter.java | 11 +- .../iceberg/flink/sink/IcebergSink.java | 21 ++- .../flink/sink/PartitionedDeltaWriter.java | 2 +- .../flink/sink/SimpleTableSupplier.java | 45 ------- .../flink/sink/UnpartitionedDeltaWriter.java | 2 +- .../committer/IcebergFlinkManifestUtil.java | 94 ------------- .../flink/sink/committer/SinkAggregator.java | 64 ++++++++- .../flink/sink/committer/SinkCommitter.java | 40 +++++- .../sink/writer/BaseDeltaTaskWriter.java | 125 ------------------ .../sink/writer/PartitionedDeltaWriter.java | 3 +- .../sink/writer/UnpartitionedDeltaWriter.java | 3 +- .../committer/TestIcebergFlinkManifest.java | 23 ++-- 12 files changed, 143 insertions(+), 290 deletions(-) delete mode 100644 flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/SimpleTableSupplier.java delete mode 100644 flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/BaseDeltaTaskWriter.java diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index e8a46c5becd7..768f7c35c40c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -38,7 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; -abstract class BaseDeltaTaskWriter extends BaseTaskWriter { +public abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final Schema schema; private final Schema deleteSchema; @@ -47,7 +47,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { private final RowDataProjection keyProjection; private final boolean upsert; - BaseDeltaTaskWriter( + protected BaseDeltaTaskWriter( PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, @@ -69,9 +69,9 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter { this.upsert = upsert; } - abstract RowDataDeltaWriter route(RowData row); + protected abstract RowDataDeltaWriter route(RowData row); - RowDataWrapper wrapper() { + protected RowDataWrapper wrapper() { return wrapper; } @@ -109,7 +109,8 @@ public void write(RowData row) throws IOException { } protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - RowDataDeltaWriter(PartitionKey partition) { + + public RowDataDeltaWriter(PartitionKey partition) { super(partition, schema, deleteSchema, DeleteGranularity.FILE); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 3b93f1e48979..59e26fee4f07 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -61,6 +61,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; @@ -82,7 +83,6 @@ import org.apache.iceberg.flink.sink.writer.RowDataTaskWriterFactory; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -768,4 +768,23 @@ public static IcebergSink.Builder forRowData(DataStream input) { static String prefixIfNotNull(String uidPrefix, String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } + + /** + * A table loader that will only reload a table after a certain interval has passed. WARNING: This + * table loader should be used carefully when used with writer tasks. It could result in heavy + * load on a catalog for jobs with many writers. + */ + static class SimpleTableSupplier implements SerializableSupplier { + private final SerializableTable table; + + SimpleTableSupplier(SerializableTable initialTable) { + Preconditions.checkArgument(initialTable != null, "initialTable cannot be null"); + this.table = initialTable; + } + + @Override + public Table get() { + return table; + } + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java index 38062dd1a2c4..85ded52ac897 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java @@ -66,7 +66,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { } @Override - RowDataDeltaWriter route(RowData row) { + protected RowDataDeltaWriter route(RowData row) { partitionKey.partition(wrapper().wrap(row)); RowDataDeltaWriter writer = writers.get(partitionKey); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/SimpleTableSupplier.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/SimpleTableSupplier.java deleted file mode 100644 index 28e692d56e71..000000000000 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/SimpleTableSupplier.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.sink; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; -import org.apache.iceberg.SerializableTable; -import org.apache.iceberg.Table; -import org.apache.iceberg.util.SerializableSupplier; - -/** - * A table loader that will only reload a table after a certain interval has passed. WARNING: This - * table loader should be used carefully when used with writer tasks. It could result in heavy load - * on a catalog for jobs with many writers. - */ -@Internal -public class SimpleTableSupplier implements SerializableSupplier
{ - private final SerializableTable table; - - SimpleTableSupplier(SerializableTable initialTable) { - Preconditions.checkArgument(initialTable != null, "initialTable cannot be null"); - this.table = initialTable; - } - - @Override - public Table get() { - return table; - } -} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java index 7680fb933b20..d12f0c11c04f 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java @@ -58,7 +58,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { } @Override - RowDataDeltaWriter route(RowData row) { + protected RowDataDeltaWriter route(RowData row) { return writer; } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/IcebergFlinkManifestUtil.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/IcebergFlinkManifestUtil.java index ab4af431a72a..f13d67e64c2a 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/IcebergFlinkManifestUtil.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/IcebergFlinkManifestUtil.java @@ -18,110 +18,16 @@ */ package org.apache.iceberg.flink.sink.committer; -import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.function.Supplier; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestWriter; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.sink.DeltaManifests; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.io.WriteResult; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class IcebergFlinkManifestUtil { - private static final int FORMAT_V2 = 2; - private static final Long DUMMY_SNAPSHOT_ID = 0L; private IcebergFlinkManifestUtil() {} - static ManifestFile writeDataFiles( - OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { - ManifestWriter writer = - ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); - - try (ManifestWriter closeableWriter = writer) { - closeableWriter.addAll(dataFiles); - } - - return writer.toManifestFile(); - } - - static List readDataFiles( - ManifestFile manifestFile, FileIO io, Map specsById) - throws IOException { - try (CloseableIterable dataFiles = ManifestFiles.read(manifestFile, io, specsById)) { - return Lists.newArrayList(dataFiles); - } - } - public static IcebergManifestOutputFileFactory createOutputFileFactory( Supplier
tableSupplier, Map props, String prefix) { return new IcebergManifestOutputFileFactory(tableSupplier, props, prefix); } - - /** - * Write the {@link WriteResult} to temporary manifest files. - * - * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same - * partition spec - */ - public static DeltaManifests writeCompletedFiles( - WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) - throws IOException { - - ManifestFile dataManifest = null; - ManifestFile deleteManifest = null; - - // Write the completed data files into a newly created data manifest file. - if (result.dataFiles() != null && result.dataFiles().length > 0) { - dataManifest = - writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); - } - - // Write the completed delete files into a newly created delete manifest file. - if (result.deleteFiles() != null && result.deleteFiles().length > 0) { - OutputFile deleteManifestFile = outputFileSupplier.get(); - - ManifestWriter deleteManifestWriter = - ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); - try (ManifestWriter writer = deleteManifestWriter) { - for (DeleteFile deleteFile : result.deleteFiles()) { - writer.add(deleteFile); - } - } - - deleteManifest = deleteManifestWriter.toManifestFile(); - } - - return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); - } - - public static WriteResult readCompletedFiles( - DeltaManifests deltaManifests, FileIO io, Map specsById) - throws IOException { - WriteResult.Builder builder = WriteResult.builder(); - - // Read the completed data files from persisted data manifest file. - if (deltaManifests.dataManifest() != null) { - builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, specsById)); - } - - // Read the completed delete files from persisted delete manifests file. - if (deltaManifests.deleteManifest() != null) { - try (CloseableIterable deleteFiles = - ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, specsById)) { - builder.addDeleteFiles(deleteFiles); - } - } - - return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build(); - } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkAggregator.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkAggregator.java index fafcb7f84d77..3ce6e51518c4 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkAggregator.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkAggregator.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; @@ -27,11 +29,19 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.DeltaManifests; import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +64,9 @@ public class SinkAggregator extends AbstractStreamOperator writeResults, long checkpo WriteResult result = WriteResult.builder().addAll(writeResults).build(); DeltaManifests deltaManifests = - IcebergFlinkManifestUtil.writeCompletedFiles( + writeCompletedFiles( result, () -> icebergManifestOutputFileFactory.create(checkpointId), table.spec()); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); } + + /** + * Write the {@link WriteResult} to temporary manifest files. + * + * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same + * partition spec + */ + public static DeltaManifests writeCompletedFiles( + WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) + throws IOException { + + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + + // Write the completed data files into a newly created data manifest file. + if (result.dataFiles() != null && result.dataFiles().length > 0) { + dataManifest = + writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + } + + // Write the completed delete files into a newly created delete manifest file. + if (result.deleteFiles() != null && result.deleteFiles().length > 0) { + OutputFile deleteManifestFile = outputFileSupplier.get(); + + ManifestWriter deleteManifestWriter = + ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + try (ManifestWriter writer = deleteManifestWriter) { + for (DeleteFile deleteFile : result.deleteFiles()) { + writer.add(deleteFile); + } + } + + deleteManifest = deleteManifestWriter.toManifestFile(); + } + + return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + ManifestWriter writer = + ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + + try (ManifestWriter closeableWriter = writer) { + closeableWriter.addAll(dataFiles); + } + + return writer.toManifestFile(); + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkCommitter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkCommitter.java index bfeea694fdb8..c68a11ab1531 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkCommitter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkCommitter.java @@ -36,7 +36,11 @@ import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; @@ -47,6 +51,8 @@ import org.apache.iceberg.flink.sink.DeltaManifests; import org.apache.iceberg.flink.sink.DeltaManifestsSerializer; import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -133,6 +139,7 @@ public void commit(Collection> commitRequests) if (!tableLoader.isOpen()) { tableLoader.open(); } + this.table = tableLoader.loadTable(); this.icebergManifestOutputFileFactory = IcebergFlinkManifestUtil.createOutputFileFactory(() -> table, table.properties(), prefix); @@ -278,9 +285,7 @@ public void commitUpToCheckpoint( DeltaManifests deltaManifests = SimpleVersionedSerialization.readVersionAndDeSerialize( DeltaManifestsSerializer.INSTANCE, e.getValue()); - pendingResults.put( - e.getKey(), - IcebergFlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); + pendingResults.put(e.getKey(), readCompletedFiles(deltaManifests, table.io(), table.specs())); manifests.addAll(deltaManifests.manifests()); } @@ -441,4 +446,33 @@ private void deleteCommittedManifests( public void close() { // Nothing to do } + + public static WriteResult readCompletedFiles( + DeltaManifests deltaManifests, FileIO io, Map specsById) + throws IOException { + WriteResult.Builder builder = WriteResult.builder(); + + // Read the completed data files from persisted data manifest file. + if (deltaManifests.dataManifest() != null) { + builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, specsById)); + } + + // Read the completed delete files from persisted delete manifests file. + if (deltaManifests.deleteManifest() != null) { + try (CloseableIterable deleteFiles = + ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, specsById)) { + builder.addDeleteFiles(deleteFiles); + } + } + + return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build(); + } + + static List readDataFiles( + ManifestFile manifestFile, FileIO io, Map specsById) + throws IOException { + try (CloseableIterable dataFiles = ManifestFiles.read(manifestFile, io, specsById)) { + return Lists.newArrayList(dataFiles); + } + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/BaseDeltaTaskWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/BaseDeltaTaskWriter.java deleted file mode 100644 index 0b5705f9d61d..000000000000 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/BaseDeltaTaskWriter.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.sink.writer; - -import java.io.IOException; -import java.util.List; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.RowDataProjection; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; - -abstract class BaseDeltaTaskWriter extends BaseTaskWriter { - - private final Schema schema; - private final Schema deleteSchema; - private final RowDataWrapper wrapper; - private final RowDataWrapper keyWrapper; - private final RowDataProjection keyProjection; - private final boolean upsert; - - BaseDeltaTaskWriter( - PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - RowType flinkSchema, - List equalityFieldIds, - boolean upsert) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.schema = schema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); - this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); - this.keyWrapper = - new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct()); - this.keyProjection = - RowDataProjection.create(flinkSchema, schema.asStruct(), deleteSchema.asStruct()); - this.upsert = upsert; - } - - abstract RowDataDeltaWriter route(RowData row); - - RowDataWrapper wrapper() { - return wrapper; - } - - @Override - public void write(RowData row) throws IOException { - RowDataDeltaWriter writer = route(row); - - switch (row.getRowKind()) { - case INSERT: - case UPDATE_AFTER: - if (upsert) { - writer.deleteKey(keyProjection.wrap(row)); - } - writer.write(row); - break; - - case UPDATE_BEFORE: - if (upsert) { - break; // UPDATE_BEFORE is not necessary for UPSERT, we do nothing to prevent delete one - // row twice - } - writer.delete(row); - break; - case DELETE: - if (upsert) { - writer.deleteKey(keyProjection.wrap(row)); - } else { - writer.delete(row); - } - break; - - default: - throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind()); - } - } - - protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); - } - - @Override - protected StructLike asStructLike(RowData data) { - return wrapper.wrap(data); - } - - @Override - protected StructLike asStructLikeKey(RowData data) { - return keyWrapper.wrap(data); - } - } -} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/PartitionedDeltaWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/PartitionedDeltaWriter.java index 32bff3b816fd..6f3de1326e76 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/PartitionedDeltaWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/PartitionedDeltaWriter.java @@ -28,6 +28,7 @@ import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.sink.BaseDeltaTaskWriter; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; @@ -66,7 +67,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter { } @Override - RowDataDeltaWriter route(RowData row) { + protected BaseDeltaTaskWriter.RowDataDeltaWriter route(RowData row) { partitionKey.partition(wrapper().wrap(row)); RowDataDeltaWriter writer = writers.get(partitionKey); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/UnpartitionedDeltaWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/UnpartitionedDeltaWriter.java index 15b448cc8b67..6e52465ecd65 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/UnpartitionedDeltaWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/UnpartitionedDeltaWriter.java @@ -25,6 +25,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.sink.BaseDeltaTaskWriter; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; @@ -58,7 +59,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { } @Override - RowDataDeltaWriter route(RowData row) { + protected RowDataDeltaWriter route(RowData row) { return writer; } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergFlinkManifest.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergFlinkManifest.java index 5656f8e748c0..ae83c9125573 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergFlinkManifest.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergFlinkManifest.java @@ -19,6 +19,10 @@ package org.apache.iceberg.flink.sink.committer; import static org.apache.iceberg.flink.sink.committer.IcebergManifestOutputFileFactory.FLINK_MANIFEST_LOCATION; +import static org.apache.iceberg.flink.sink.committer.SinkAggregator.writeCompletedFiles; +import static org.apache.iceberg.flink.sink.committer.SinkAggregator.writeDataFiles; +import static org.apache.iceberg.flink.sink.committer.SinkCommitter.readCompletedFiles; +import static org.apache.iceberg.flink.sink.committer.SinkCommitter.readDataFiles; import static org.assertj.core.api.Assertions.assertThat; import java.io.File; @@ -101,7 +105,7 @@ void testIO() throws IOException { List eqDeleteFiles = generateEqDeleteFiles(5); List posDeleteFiles = generatePosDeleteFiles(5); DeltaManifests deltaManifests = - IcebergFlinkManifestUtil.writeCompletedFiles( + writeCompletedFiles( WriteResult.builder() .addDataFiles(dataFiles) .addDeleteFiles(eqDeleteFiles) @@ -110,8 +114,7 @@ void testIO() throws IOException { () -> factory.create(curCkpId), table.spec()); - WriteResult result = - IcebergFlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()); + WriteResult result = readCompletedFiles(deltaManifests, table.io(), table.specs()); assertThat(result.deleteFiles()).as("Size of data file list are not equal").hasSize(10); for (int i = 0; i < dataFiles.size(); i++) { @@ -138,7 +141,7 @@ public void testUserProvidedManifestLocation() throws IOException { List dataFiles = generateDataFiles(5); DeltaManifests deltaManifests = - IcebergFlinkManifestUtil.writeCompletedFiles( + writeCompletedFiles( WriteResult.builder().addDataFiles(dataFiles).build(), () -> factory.create(checkpointId), table.spec()); @@ -150,8 +153,7 @@ public void testUserProvidedManifestLocation() throws IOException { .as("The newly created manifest file should be located under the user provided directory") .isEqualTo(Paths.get(deltaManifests.dataManifest().path()).getParent()); - WriteResult result = - IcebergFlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()); + WriteResult result = readCompletedFiles(deltaManifests, table.io(), table.specs()); assertThat(result.deleteFiles()).isEmpty(); assertThat(result.dataFiles()).hasSize(5); @@ -175,7 +177,7 @@ public void testVersionedSerializer() throws IOException { List eqDeleteFiles = generateEqDeleteFiles(10); List posDeleteFiles = generatePosDeleteFiles(10); DeltaManifests expected = - IcebergFlinkManifestUtil.writeCompletedFiles( + writeCompletedFiles( WriteResult.builder() .addDataFiles(dataFiles) .addDeleteFiles(eqDeleteFiles) @@ -207,9 +209,7 @@ public void testCompatibility() throws IOException { new IcebergManifestOutputFileFactory(() -> table, table.properties(), "prefix"); List dataFiles = generateDataFiles(10); - ManifestFile manifest = - IcebergFlinkManifestUtil.writeDataFiles( - factory.create(checkpointId), table.spec(), dataFiles); + ManifestFile manifest = writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles); byte[] dataV1 = SimpleVersionedSerialization.writeVersionAndSerialize(new V1Serializer(), manifest); @@ -224,8 +224,7 @@ public void testCompatibility() throws IOException { assertThat(manifest).isEqualTo(delta.dataManifest()); TestHelpers.assertEquals(manifest, delta.dataManifest()); - List actualFiles = - IcebergFlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io(), table.specs()); + List actualFiles = readDataFiles(delta.dataManifest(), table.io(), table.specs()); assertThat(actualFiles).hasSize(10); for (int i = 0; i < 10; i++) {