From 041b7a09aa96fbb5bfd0b502111a47d092cedce9 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Fri, 27 Oct 2023 19:01:30 -0700 Subject: [PATCH] Introduce ManifestWriter.Options and ManifestListWriter.Options ... and have the static methods in ManifestFiles and ManifestLists for writing use them. --- .../org/apache/iceberg/ManifestFiles.java | 31 +++--- .../apache/iceberg/ManifestListWriter.java | 71 ++++++++------ .../org/apache/iceberg/ManifestLists.java | 19 ++-- .../org/apache/iceberg/ManifestWriter.java | 95 +++++++++++-------- .../org/apache/iceberg/SnapshotProducer.java | 29 +++--- .../org/apache/iceberg/TableTestBase.java | 15 ++- .../iceberg/flink/sink/FlinkManifestUtil.java | 13 ++- .../iceberg/flink/sink/FlinkManifestUtil.java | 13 ++- .../apache/iceberg/spark/SparkTableUtil.java | 9 +- .../actions/RewriteManifestsSparkAction.java | 9 +- .../apache/iceberg/spark/SparkTableUtil.java | 9 +- .../actions/RewriteManifestsSparkAction.java | 9 +- .../apache/iceberg/spark/SparkTableUtil.java | 9 +- .../actions/RewriteManifestsSparkAction.java | 9 +- 14 files changed, 215 insertions(+), 125 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index eaa7c9075a4e..ee08261edbce 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -157,7 +157,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outp */ public static ManifestWriter write( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { - return write(formatVersion, spec, outputFile, snapshotId, null, null); + return write(formatVersion, spec, outputFile, snapshotId, ManifestWriter.options()); } /** @@ -167,8 +167,7 @@ public static ManifestWriter write( * @param spec a {@link PartitionSpec} * @param outputFile an {@link OutputFile} where the manifest will be written * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID - * @param compressionCodec compression codec for the manifest file - * @param compressionLevel compression level of the compressionCodec + * @param options additional options for the manifest writer * @return a manifest writer */ public static ManifestWriter write( @@ -176,15 +175,12 @@ public static ManifestWriter write( PartitionSpec spec, OutputFile outputFile, Long snapshotId, - String compressionCodec, - Integer compressionLevel) { + ManifestWriter.Options options) { switch (formatVersion) { case 1: - return new ManifestWriter.V1Writer( - spec, outputFile, snapshotId, compressionCodec, compressionLevel); + return new ManifestWriter.V1Writer(spec, outputFile, snapshotId, options); case 2: - return new ManifestWriter.V2Writer( - spec, outputFile, snapshotId, compressionCodec, compressionLevel); + return new ManifestWriter.V2Writer(spec, outputFile, snapshotId, options); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -221,7 +217,8 @@ public static ManifestReader readDeleteManifest( */ public static ManifestWriter writeDeleteManifest( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { - return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, null, null); + return writeDeleteManifest( + formatVersion, spec, outputFile, snapshotId, ManifestWriter.options()); } /** @@ -231,8 +228,7 @@ public static ManifestWriter writeDeleteManifest( * @param spec a {@link PartitionSpec} * @param outputFile an {@link OutputFile} where the manifest will be written * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID - * @param compressionCodec compression codec for the manifest file - * @param compressionLevel compression level of the compressionCodec + * @param options additional options for the manifest writer * @return a manifest writer */ public static ManifestWriter writeDeleteManifest( @@ -240,14 +236,12 @@ public static ManifestWriter writeDeleteManifest( PartitionSpec spec, OutputFile outputFile, Long snapshotId, - String compressionCodec, - Integer compressionLevel) { + ManifestWriter.Options options) { switch (formatVersion) { case 1: throw new IllegalArgumentException("Cannot write delete files in a v1 table"); case 2: - return new ManifestWriter.V2DeleteWriter( - spec, outputFile, snapshotId, compressionCodec, compressionLevel); + return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, options); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -367,8 +361,9 @@ private static ManifestFile copyManifestInternal( reader.spec(), outputFile, snapshotId, - compressionCodec, - compressionLevel); + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index e0b98e473631..7e8bcd9d6ee3 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -31,18 +31,14 @@ abstract class ManifestListWriter implements FileAppender { private final FileAppender writer; - private ManifestListWriter( - OutputFile file, - Map meta, - String compressionCodec, - Integer compressionLevel) { - this.writer = newAppender(file, meta, compressionCodec, compressionLevel); + private ManifestListWriter(OutputFile file, Map meta, Options options) { + this.writer = newAppender(file, meta, options); } protected abstract ManifestFile prepare(ManifestFile manifest); protected abstract FileAppender newAppender( - OutputFile file, Map meta, String compressionCodec, Integer compressionLevel); + OutputFile file, Map meta, Options options); @Override public void add(ManifestFile manifest) { @@ -74,6 +70,35 @@ public long length() { return writer.length(); } + public static Options options() { + return new Options(); + } + + static class Options { + private String compCodec; + private Integer compLevel; + + private Options() {} + + public Options compressionCodec(String codec) { + compCodec = codec; + return this; + } + + public Options compressionLevel(Integer level) { + compLevel = level; + return this; + } + + String compressionCodec() { + return compCodec; + } + + Integer compressionLevel() { + return compLevel; + } + } + static class V2Writer extends ManifestListWriter { private final V2Metadata.IndexedManifestFile wrapper; @@ -82,8 +107,7 @@ static class V2Writer extends ManifestListWriter { long snapshotId, Long parentSnapshotId, long sequenceNumber, - String compressionCodec, - Integer compressionLevel) { + Options options) { super( snapshotFile, ImmutableMap.of( @@ -91,8 +115,7 @@ static class V2Writer extends ManifestListWriter { "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), "format-version", "2"), - compressionCodec, - compressionLevel); + options); this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber); } @@ -103,10 +126,10 @@ protected ManifestFile prepare(ManifestFile manifest) { @Override protected FileAppender newAppender( - OutputFile file, - Map meta, - String compressionCodec, - Integer compressionLevel) { + OutputFile file, Map meta, Options options) { + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { Avro.WriteBuilder builder = Avro.write(file) @@ -133,20 +156,14 @@ protected FileAppender newAppender( static class V1Writer extends ManifestListWriter { private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile(); - V1Writer( - OutputFile snapshotFile, - long snapshotId, - Long parentSnapshotId, - String compressionCodec, - Integer compressionLevel) { + V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, Options options) { super( snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "format-version", "1"), - compressionCodec, - compressionLevel); + options); } @Override @@ -159,10 +176,10 @@ protected ManifestFile prepare(ManifestFile manifest) { @Override protected FileAppender newAppender( - OutputFile file, - Map meta, - String compressionCodec, - Integer compressionLevel) { + OutputFile file, Map meta, Options options) { + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { Avro.WriteBuilder builder = Avro.write(file) diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index a92743d847d0..7d39dc68ea11 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -62,7 +62,12 @@ static ManifestListWriter write( Long parentSnapshotId, long sequenceNumber) { return write( - formatVersion, manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, null, null); + formatVersion, + manifestListFile, + snapshotId, + parentSnapshotId, + sequenceNumber, + ManifestListWriter.options()); } static ManifestListWriter write( @@ -71,8 +76,7 @@ static ManifestListWriter write( long snapshotId, Long parentSnapshotId, long sequenceNumber, - String compressionCodec, - Integer compressionLevel) { + ManifestListWriter.Options options) { switch (formatVersion) { case 1: Preconditions.checkArgument( @@ -80,15 +84,10 @@ static ManifestListWriter write( "Invalid sequence number for v1 manifest list: %s", sequenceNumber); return new ManifestListWriter.V1Writer( - manifestListFile, snapshotId, parentSnapshotId, compressionCodec, compressionLevel); + manifestListFile, snapshotId, parentSnapshotId, options); case 2: return new ManifestListWriter.V2Writer( - manifestListFile, - snapshotId, - parentSnapshotId, - sequenceNumber, - compressionCodec, - compressionLevel); + manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, options); } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index b56f71719773..1fd87c78179c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -52,15 +52,10 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minDataSequenceNumber = null; - private ManifestWriter( - PartitionSpec spec, - OutputFile file, - Long snapshotId, - String compressionCodec, - Integer compressionLevel) { + private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { this.file = file; this.specId = spec.specId(); - this.writer = newAppender(spec, file, compressionCodec, compressionLevel); + this.writer = newAppender(spec, file, options); this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); @@ -69,18 +64,15 @@ private ManifestWriter( protected abstract ManifestEntry prepare(ManifestEntry entry); /** - * @deprecated since 1.4.0, will be removed in 1.5.0; use {@link - * ManifestWriter#newAppender(PartitionSpec, OutputFile, String, Integer)} instead. + * @deprecated since 1.5.0, will be removed in 1.6.0; use {@link + * ManifestWriter#newAppender(PartitionSpec, OutputFile, Options)} instead. */ @Deprecated protected abstract FileAppender> newAppender( PartitionSpec spec, OutputFile outputFile); protected FileAppender> newAppender( - PartitionSpec spec, - OutputFile outputFile, - String compressionCodec, - Integer compressionLevel) { + PartitionSpec spec, OutputFile outputFile, Options options) { return newAppender(spec, outputFile); } @@ -231,16 +223,40 @@ public void close() throws IOException { writer.close(); } + public static Options options() { + return new Options(); + } + + public static class Options { + private String compCodec; + private Integer compLevel; + + private Options() {} + + public Options compressionCodec(String codec) { + compCodec = codec; + return this; + } + + public Options compressionLevel(Integer level) { + compLevel = level; + return this; + } + + String compressionCodec() { + return compCodec; + } + + Integer compressionLevel() { + return compLevel; + } + } + static class V2Writer extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2Writer( - PartitionSpec spec, - OutputFile file, - Long snapshotId, - String compressionCodec, - Integer compressionLevel) { - super(spec, file, snapshotId, compressionCodec, compressionLevel); + V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { + super(spec, file, snapshotId, options); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -252,13 +268,16 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { - return newAppender(spec, file, null, null); + return newAppender(spec, file, options()); } @Override protected FileAppender> newAppender( - PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { + PartitionSpec spec, OutputFile file, Options options) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { Avro.WriteBuilder builder = Avro.write(file) @@ -289,13 +308,8 @@ protected FileAppender> newAppender( static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2DeleteWriter( - PartitionSpec spec, - OutputFile file, - Long snapshotId, - String compressionCodec, - Integer compressionLevel) { - super(spec, file, snapshotId, compressionCodec, compressionLevel); + V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { + super(spec, file, snapshotId, options); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -307,13 +321,16 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { - return newAppender(spec, file, null, null); + return newAppender(spec, file, options()); } @Override protected FileAppender> newAppender( - PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { + PartitionSpec spec, OutputFile file, Options options) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { Avro.WriteBuilder builder = Avro.write(file) @@ -349,13 +366,8 @@ protected ManifestContent content() { static class V1Writer extends ManifestWriter { private final V1Metadata.IndexedManifestEntry entryWrapper; - V1Writer( - PartitionSpec spec, - OutputFile file, - Long snapshotId, - String compressionCodec, - Integer compressionLevel) { - super(spec, file, snapshotId, compressionCodec, compressionLevel); + V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { + super(spec, file, snapshotId, options); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); } @@ -367,13 +379,16 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { - return newAppender(spec, file, null, null); + return newAppender(spec, file, options()); } @Override protected FileAppender> newAppender( - PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { + PartitionSpec spec, OutputFile file, Options options) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { Avro.WriteBuilder builder = Avro.write(file) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 981a6bb0a5da..ff55642b9a49 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -227,6 +227,10 @@ public Snapshot apply() { OutputFile manifestList = manifestListPath(); + ManifestListWriter.Options options = + ManifestListWriter.options() + .compressionCodec(base.properties().get(TableProperties.AVRO_COMPRESSION)) + .compressionLevel(base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); try (ManifestListWriter writer = ManifestLists.write( base.formatVersion(), @@ -234,8 +238,7 @@ public Snapshot apply() { snapshotId(), parentSnapshotId, sequenceNumber, - base.properties().get(TableProperties.AVRO_COMPRESSION), - base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL))) { + options)) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -501,23 +504,21 @@ protected OutputFile newManifestOutput() { } protected ManifestWriter newManifestWriter(PartitionSpec spec) { + ManifestWriter.Options options = + ManifestWriter.options() + .compressionCodec(base.properties().get(TableProperties.AVRO_COMPRESSION)) + .compressionLevel(base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); return ManifestFiles.write( - base.formatVersion(), - spec, - newManifestOutput(), - snapshotId(), - base.properties().get(TableProperties.AVRO_COMPRESSION), - base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); + base.formatVersion(), spec, newManifestOutput(), snapshotId(), options); } protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) { + ManifestWriter.Options options = + ManifestWriter.options() + .compressionCodec(base.properties().get(TableProperties.AVRO_COMPRESSION)) + .compressionLevel(base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); return ManifestFiles.writeDeleteManifest( - base.formatVersion(), - spec, - newManifestOutput(), - snapshotId(), - base.properties().get(TableProperties.AVRO_COMPRESSION), - base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); + base.formatVersion(), spec, newManifestOutput(), snapshotId(), options); } protected RollingManifestWriter newRollingManifestWriter(PartitionSpec spec) { diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 2f0be89e7e69..1b6b707e27fb 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -267,7 +267,11 @@ ManifestFile writeManifest(Long snapshotId, String compressionCodec, DataFile... ManifestWriter writer = ManifestFiles.write( - formatVersion, table.spec(), outputFile, snapshotId, compressionCodec, null); + formatVersion, + table.spec(), + outputFile, + snapshotId, + ManifestWriter.options().compressionCodec(compressionCodec)); try { for (DataFile file : files) { writer.add(file); @@ -329,7 +333,11 @@ ManifestFile writeDeleteManifest( FileFormat.AVRO.addExtension(temp.newFile().toString())); ManifestWriter writer = ManifestFiles.writeDeleteManifest( - newFormatVersion, SPEC, manifestFile, snapshotId, compressionCodec, null); + newFormatVersion, + SPEC, + manifestFile, + snapshotId, + ManifestWriter.options().compressionCodec(compressionCodec)); try { for (DeleteFile deleteFile : deleteFiles) { writer.add(deleteFile); @@ -355,8 +363,7 @@ InputFile writeManifestList(String compressionCodec, ManifestFile... manifestFil EXAMPLE_SNAPSHOT_ID, EXAMPLE_SNAPSHOT_ID - 1, formatVersion > 1 ? 34L : 0, - compressionCodec, - null)) { + ManifestListWriter.options().compressionCodec(compressionCodec))) { for (ManifestFile manifestFile : manifestFiles) { writer.add(manifestFile); } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 07afe292013d..01a778ffbe3b 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -58,7 +58,13 @@ static ManifestFile writeDataFiles( throws IOException { ManifestWriter writer = ManifestFiles.write( - FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID, compressionCodec, compressionLevel); + FORMAT_V2, + spec, + outputFile, + DUMMY_SNAPSHOT_ID, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -133,8 +139,9 @@ static DeltaManifests writeCompletedFiles( spec, deleteManifestFile, DUMMY_SNAPSHOT_ID, - compressionCodec, - compressionLevel); + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 07afe292013d..01a778ffbe3b 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -58,7 +58,13 @@ static ManifestFile writeDataFiles( throws IOException { ManifestWriter writer = ManifestFiles.write( - FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID, compressionCodec, compressionLevel); + FORMAT_V2, + spec, + outputFile, + DUMMY_SNAPSHOT_ID, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -133,8 +139,9 @@ static DeltaManifests writeCompletedFiles( spec, deleteManifestFile, DUMMY_SNAPSHOT_ID, - compressionCodec, - compressionLevel); + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 79144eacd1e1..2177cfd3e958 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -383,7 +383,14 @@ private static Iterator buildManifest( String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); ManifestWriter writer = - ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); + ManifestFiles.write( + 1, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 5c3f29c75d52..45c1d1ed2fcf 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -393,7 +393,14 @@ private static ManifestFile writeManifest( SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); ManifestWriter writer = - ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); + ManifestFiles.write( + format, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try { for (int index = startIndex; index < endIndex; index++) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 8edd55acb08f..7ce4ee0c45cc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -346,7 +346,14 @@ private static Iterator buildManifest( String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); ManifestWriter writer = - ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); + ManifestFiles.write( + 1, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 5c3f29c75d52..45c1d1ed2fcf 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -393,7 +393,14 @@ private static ManifestFile writeManifest( SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); ManifestWriter writer = - ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); + ManifestFiles.write( + format, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try { for (int index = startIndex; index < endIndex; index++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 8edd55acb08f..7ce4ee0c45cc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -346,7 +346,14 @@ private static Iterator buildManifest( String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); ManifestWriter writer = - ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); + ManifestFiles.write( + 1, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 5c3f29c75d52..45c1d1ed2fcf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -393,7 +393,14 @@ private static ManifestFile writeManifest( SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); ManifestWriter writer = - ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); + ManifestFiles.write( + format, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try { for (int index = startIndex; index < endIndex; index++) {