-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
…tion (#10179) Co-authored-by: Liwei Li <[email protected]> Co-authored-by: Kyle Bendickson <[email protected]> Co-authored-by: Peter Vary <[email protected]>
- Loading branch information
1 parent
a7398ab
commit bea364c
Showing
23 changed files
with
4,129 additions
and
88 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
95 changes: 95 additions & 0 deletions
95
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.flink.sink; | ||
|
||
import java.io.Serializable; | ||
import java.util.Arrays; | ||
import java.util.Objects; | ||
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; | ||
|
||
/** | ||
* The aggregated results of a single checkpoint which should be committed. Containing the | ||
* serialized {@link org.apache.iceberg.flink.sink.DeltaManifests} file - which contains the commit | ||
* data, and the jobId, operatorId, checkpointId triplet which helps identifying the specific commit | ||
* | ||
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer | ||
* and the Aggregator operator and between the Aggregator and the Committer as well. | ||
*/ | ||
class IcebergCommittable implements Serializable { | ||
private final byte[] manifest; | ||
private final String jobId; | ||
private final String operatorId; | ||
private final long checkpointId; | ||
|
||
IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) { | ||
this.manifest = manifest; | ||
this.jobId = jobId; | ||
this.operatorId = operatorId; | ||
this.checkpointId = checkpointId; | ||
} | ||
|
||
byte[] manifest() { | ||
return manifest; | ||
} | ||
|
||
String jobId() { | ||
return jobId; | ||
} | ||
|
||
String operatorId() { | ||
return operatorId; | ||
} | ||
|
||
Long checkpointId() { | ||
return checkpointId; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this) | ||
.add("jobId", jobId) | ||
.add("checkpointId", checkpointId) | ||
.add("operatorId", operatorId) | ||
.toString(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
|
||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
IcebergCommittable that = (IcebergCommittable) o; | ||
return checkpointId == that.checkpointId | ||
&& Arrays.equals(manifest, that.manifest) | ||
&& Objects.equals(jobId, that.jobId) | ||
&& Objects.equals(operatorId, that.operatorId); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = Objects.hash(jobId, operatorId, checkpointId); | ||
result = 31 * result + Arrays.hashCode(manifest); | ||
return result; | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
...v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.flink.sink; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import org.apache.flink.core.io.SimpleVersionedSerializer; | ||
import org.apache.flink.core.memory.DataInputDeserializer; | ||
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; | ||
|
||
/** | ||
* This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer | ||
* and the Aggregator operator and between the Aggregator and the Committer as well. | ||
* | ||
* <p>In both cases only the respective part is serialized. | ||
*/ | ||
class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> { | ||
private static final int VERSION = 1; | ||
|
||
@Override | ||
public int getVersion() { | ||
return VERSION; | ||
} | ||
|
||
@Override | ||
public byte[] serialize(IcebergCommittable committable) throws IOException { | ||
ByteArrayOutputStream out = new ByteArrayOutputStream(); | ||
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); | ||
view.writeUTF(committable.jobId()); | ||
view.writeUTF(committable.operatorId()); | ||
view.writeLong(committable.checkpointId()); | ||
view.writeInt(committable.manifest().length); | ||
view.write(committable.manifest()); | ||
return out.toByteArray(); | ||
} | ||
|
||
@Override | ||
public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException { | ||
if (version == 1) { | ||
DataInputDeserializer view = new DataInputDeserializer(serialized); | ||
String jobId = view.readUTF(); | ||
String operatorId = view.readUTF(); | ||
long checkpointId = view.readLong(); | ||
int manifestLen = view.readInt(); | ||
byte[] manifestBuf; | ||
manifestBuf = new byte[manifestLen]; | ||
view.read(manifestBuf); | ||
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId); | ||
} | ||
throw new IOException("Unrecognized version or corrupt state: " + version); | ||
} | ||
} |
Oops, something went wrong.