Skip to content

Commit

Permalink
Update Flink 1.17.
Browse files Browse the repository at this point in the history
  • Loading branch information
wypoon committed Sep 20, 2023
1 parent b0caadf commit 4a2a650
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,20 @@ private FlinkManifestUtil() {}

static ManifestFile writeDataFiles(
OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) throws IOException {
return writeDataFiles(
outputFile, spec, dataFiles, /* compressionCodec */ null, /* compressionLevel */ null);
}

static ManifestFile writeDataFiles(
OutputFile outputFile,
PartitionSpec spec,
List<DataFile> dataFiles,
String compressionCodec,
Integer compressionLevel)
throws IOException {
ManifestWriter<DataFile> writer =
ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
ManifestFiles.write(
FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID, compressionCodec, compressionLevel);

try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
Expand Down Expand Up @@ -85,22 +97,44 @@ static ManifestOutputFileFactory createOutputFileFactory(
static DeltaManifests writeCompletedFiles(
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
throws IOException {
return writeCompletedFiles(
result, outputFileSupplier, spec, /* compressionCodec */ null, /* compressionLevel */ null);
}

static DeltaManifests writeCompletedFiles(
WriteResult result,
Supplier<OutputFile> outputFileSupplier,
PartitionSpec spec,
String compressionCodec,
Integer compressionLevel)
throws IOException {

ManifestFile dataManifest = null;
ManifestFile deleteManifest = null;

// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest =
writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
writeDataFiles(
outputFileSupplier.get(),
spec,
Lists.newArrayList(result.dataFiles()),
compressionCodec,
compressionLevel);
}

// Write the completed delete files into a newly created delete manifest file.
if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
OutputFile deleteManifestFile = outputFileSupplier.get();

ManifestWriter<DeleteFile> deleteManifestWriter =
ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
ManifestFiles.writeDeleteManifest(
FORMAT_V2,
spec,
deleteManifestFile,
DUMMY_SNAPSHOT_ID,
compressionCodec,
compressionLevel);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -449,7 +450,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException {
WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
result, () -> manifestOutputFileFactory.create(checkpointId), spec);
result,
() -> manifestOutputFileFactory.create(checkpointId),
table.spec(),
table.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL));

return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
Expand Down

0 comments on commit 4a2a650

Please sign in to comment.