Skip to content

Commit

Permalink
A new implementation of an Iceberg Sink [WIP] thta will be used with …
Browse files Browse the repository at this point in the history
…upcoming Flink Compaction jobs
  • Loading branch information
rodmeneses committed May 6, 2024
1 parent 756bbbd commit a1e4c52
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;

abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
public abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {

private final Schema schema;
private final Schema deleteSchema;
Expand All @@ -47,7 +47,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final RowDataProjection keyProjection;
private final boolean upsert;

BaseDeltaTaskWriter(
protected BaseDeltaTaskWriter(
PartitionSpec spec,
FileFormat format,
FileAppenderFactory<RowData> appenderFactory,
Expand All @@ -69,9 +69,9 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
this.upsert = upsert;
}

abstract RowDataDeltaWriter route(RowData row);
protected abstract RowDataDeltaWriter route(RowData row);

RowDataWrapper wrapper() {
protected RowDataWrapper wrapper() {
return wrapper;
}

Expand Down Expand Up @@ -109,7 +109,8 @@ public void write(RowData row) throws IOException {
}

protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
RowDataDeltaWriter(PartitionKey partition) {

public RowDataDeltaWriter(PartitionKey partition) {
super(partition, schema, deleteSchema, DeleteGranularity.FILE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
Expand All @@ -82,7 +83,6 @@
import org.apache.iceberg.flink.sink.writer.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -768,4 +768,23 @@ public static IcebergSink.Builder forRowData(DataStream<RowData> input) {
static String prefixIfNotNull(String uidPrefix, String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}

/**
* A table loader that will only reload a table after a certain interval has passed. WARNING: This
* table loader should be used carefully when used with writer tasks. It could result in heavy
* load on a catalog for jobs with many writers.
*/
static class SimpleTableSupplier implements SerializableSupplier<Table> {
private final SerializableTable table;

SimpleTableSupplier(SerializableTable initialTable) {
Preconditions.checkArgument(initialTable != null, "initialTable cannot be null");
this.table = initialTable;
}

@Override
public Table get() {
return table;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
}

@Override
RowDataDeltaWriter route(RowData row) {
protected RowDataDeltaWriter route(RowData row) {
partitionKey.partition(wrapper().wrap(row));

RowDataDeltaWriter writer = writers.get(partitionKey);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
}

@Override
RowDataDeltaWriter route(RowData row) {
protected RowDataDeltaWriter route(RowData row) {
return writer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,110 +18,16 @@
*/
package org.apache.iceberg.flink.sink.committer;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class IcebergFlinkManifestUtil {
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;

private IcebergFlinkManifestUtil() {}

static ManifestFile writeDataFiles(
OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) throws IOException {
ManifestWriter<DataFile> writer =
ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);

try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
}

return writer.toManifestFile();
}

static List<DataFile> readDataFiles(
ManifestFile manifestFile, FileIO io, Map<Integer, PartitionSpec> specsById)
throws IOException {
try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io, specsById)) {
return Lists.newArrayList(dataFiles);
}
}

public static IcebergManifestOutputFileFactory createOutputFileFactory(
Supplier<Table> tableSupplier, Map<String, String> props, String prefix) {
return new IcebergManifestOutputFileFactory(tableSupplier, props, prefix);
}

/**
* Write the {@link WriteResult} to temporary manifest files.
*
* @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
* partition spec
*/
public static DeltaManifests writeCompletedFiles(
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
throws IOException {

ManifestFile dataManifest = null;
ManifestFile deleteManifest = null;

// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest =
writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
}

// Write the completed delete files into a newly created delete manifest file.
if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
OutputFile deleteManifestFile = outputFileSupplier.get();

ManifestWriter<DeleteFile> deleteManifestWriter =
ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
}
}

deleteManifest = deleteManifestWriter.toManifestFile();
}

return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}

public static WriteResult readCompletedFiles(
DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
throws IOException {
WriteResult.Builder builder = WriteResult.builder();

// Read the completed data files from persisted data manifest file.
if (deltaManifests.dataManifest() != null) {
builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, specsById));
}

// Read the completed delete files from persisted delete manifests file.
if (deltaManifests.deleteManifest() != null) {
try (CloseableIterable<DeleteFile> deleteFiles =
ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, specsById)) {
builder.addDeleteFiles(deleteFiles);
}
}

return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,28 @@

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +64,9 @@ public class SinkAggregator extends AbstractStreamOperator<CommittableMessage<Si

private transient boolean initialized = false;

private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;

public SinkAggregator(TableLoader tableLoader, String prefix) {
this.results = Sets.newHashSet();
this.tableLoader = tableLoader;
Expand Down Expand Up @@ -120,10 +133,59 @@ public byte[] writeToManifest(Collection<WriteResult> writeResults, long checkpo

WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
IcebergFlinkManifestUtil.writeCompletedFiles(
writeCompletedFiles(
result, () -> icebergManifestOutputFileFactory.create(checkpointId), table.spec());

return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
}

/**
* Write the {@link WriteResult} to temporary manifest files.
*
* @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
* partition spec
*/
public static DeltaManifests writeCompletedFiles(
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
throws IOException {

ManifestFile dataManifest = null;
ManifestFile deleteManifest = null;

// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest =
writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
}

// Write the completed delete files into a newly created delete manifest file.
if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
OutputFile deleteManifestFile = outputFileSupplier.get();

ManifestWriter<DeleteFile> deleteManifestWriter =
ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
}
}

deleteManifest = deleteManifestWriter.toManifestFile();
}

return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}

static ManifestFile writeDataFiles(
OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) throws IOException {
ManifestWriter<DataFile> writer =
ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);

try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
}

return writer.toManifestFile();
}
}
Loading

0 comments on commit a1e4c52

Please sign in to comment.