Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces the new IcebergSink based on the new V2 Flink Sink Abstraction #10179

Merged
merged 25 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c9657d8
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 6, 2024
a58babf
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 6, 2024
04b23ff
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 6, 2024
5f999ea
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 14, 2024
4b8ec1d
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 14, 2024
5afeee5
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 14, 2024
869598e
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 15, 2024
07df5f8
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 15, 2024
bc536ee
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 15, 2024
811e3e3
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
c17dfa7
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
2813669
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
fff6f07
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
8667d36
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
9421170
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
4f1b58b
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
6230830
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 16, 2024
bc586af
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 20, 2024
dfad25f
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 20, 2024
91c01d8
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
rodmeneses Aug 20, 2024
d2a63f9
Introduces the new IcebergSink based on the new V2 Flink Sink Abs…
rodmeneses Aug 25, 2024
a77bb24
Introduces the new IcebergSink based on the new V2 Flink Sink Abs…
rodmeneses Aug 26, 2024
d211136
Introduces the new IcebergSink based on the new V2 Flink Sink Abs…
rodmeneses Aug 26, 2024
71f2893
Fixes spotless issue
rodmeneses Aug 26, 2024
ebc0d79
Changes the uidSuffix used in IceberSink uuid related tests
rodmeneses Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlinkManifestUtil {

private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class);
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;

Expand Down Expand Up @@ -129,4 +134,26 @@ static WriteResult readCompletedFiles(

return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
}

static void deleteCommittedManifests(
Table table, List<ManifestFile> manifests, String newFlinkJobId, long checkpointId) {
for (ManifestFile manifest : manifests) {
try {
table.io().deleteFile(manifest.path());
} catch (Exception e) {
// The flink manifests cleaning failure shouldn't abort the completed checkpoint.
String details =
MoreObjects.toStringHelper(FlinkManifestUtil.class)
.add("tableName", table.name())
.add("flinkJobId", newFlinkJobId)
.add("checkpointId", checkpointId)
.add("manifestPath", manifest.path())
.toString();
LOG.warn(
"The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
details,
e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);

// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
List<Integer> equalityFieldIds =
SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);

RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
int writerParallelism =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* The aggregated results of a single checkpoint which should be committed. Containing the
* serialized {@link org.apache.iceberg.flink.sink.DeltaManifests} file - which contains the commit
* data, and the jobId, operatorId, checkpointId triplet which helps identifying the specific commit
*
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer
* and the Aggregator operator and between the Aggregator and the Committer as well.
*/
class IcebergCommittable implements Serializable {
private final byte[] manifest;
private final String jobId;
private final String operatorId;
private final long checkpointId;

IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) {
this.manifest = manifest;
this.jobId = jobId;
this.operatorId = operatorId;
this.checkpointId = checkpointId;
}

byte[] manifest() {
return manifest;
}

String jobId() {
return jobId;
}

String operatorId() {
return operatorId;
}

Long checkpointId() {
return checkpointId;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobId", jobId)
.add("checkpointId", checkpointId)
.add("operatorId", operatorId)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

IcebergCommittable that = (IcebergCommittable) o;
return checkpointId == that.checkpointId
&& Arrays.equals(manifest, that.manifest)
&& Objects.equals(jobId, that.jobId)
&& Objects.equals(operatorId, that.operatorId);
}

@Override
public int hashCode() {
int result = Objects.hash(jobId, operatorId, checkpointId);
result = 31 * result + Arrays.hashCode(manifest);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;

/**
* This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer
* and the Aggregator operator and between the Aggregator and the Committer as well.
*
* <p>In both cases only the respective part is serialized.
*/
class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
private static final int VERSION = 1;

@Override
public int getVersion() {
return VERSION;
}

@Override
public byte[] serialize(IcebergCommittable committable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
view.writeInt(committable.manifest().length);
view.write(committable.manifest());
return out.toByteArray();
}

@Override
public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException {
if (version == 1) {
DataInputDeserializer view = new DataInputDeserializer(serialized);
String jobId = view.readUTF();
String operatorId = view.readUTF();
long checkpointId = view.readLong();
int manifestLen = view.readInt();
byte[] manifestBuf;
manifestBuf = new byte[manifestLen];
view.read(manifestBuf);
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
Loading