diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 16baefe25c1d..85cc8d902026 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -69,7 +69,7 @@ public class Avro { private Avro() {} - public enum Codec { + private enum Codec { UNCOMPRESSED, SNAPPY, GZIP, diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 572814d6076d..2f0be89e7e69 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.avro.file.DataFileConstants; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; @@ -173,9 +171,9 @@ public class TableTestBase { // the Avro metadata and checking for the Avro name in avro.codec. static final Map AVRO_CODEC_NAME_MAPPING = ImmutableMap.builder() - .put(Avro.Codec.UNCOMPRESSED.name(), DataFileConstants.NULL_CODEC) - .put(Avro.Codec.ZSTD.name(), DataFileConstants.ZSTANDARD_CODEC) - .put(Avro.Codec.GZIP.name(), DataFileConstants.DEFLATE_CODEC) + .put("uncompressed", "null") + .put("zstd", "zstandard") + .put("gzip", "deflate") .build(); static final long EXAMPLE_SNAPSHOT_ID = 987134631982734L; diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java index 3d24d45d3863..aea21dfedb12 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java @@ -24,7 +24,7 @@ import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.io.InputFile; import org.assertj.core.api.Assertions; -import org.junit.Assume; +import org.assertj.core.api.Assumptions; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -48,7 +48,7 @@ public void testWriteManifestListWithCompression() throws IOException { @Test public void testWriteDeleteManifestListWithCompression() throws IOException { - Assume.assumeTrue("delete files are only written for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); validateManifestListCompressionCodec(true); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index ab19237285be..92be7a83dbe3 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -32,7 +32,6 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -107,7 +106,7 @@ public void testManifestPartitionStats() throws IOException { @Test public void testWriteManifestWithSequenceNumber() throws IOException { - Assume.assumeTrue("sequence number is only valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); File manifestFile = temp.newFile("manifest.avro"); Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -132,7 +131,7 @@ public void testWriteManifestWithSequenceNumber() throws IOException { @Test public void testCommitManifestWithExplicitDataSequenceNumber() throws IOException { - Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); DataFile file1 = newFile(50); DataFile file2 = newFile(50); @@ -177,7 +176,7 @@ public void testCommitManifestWithExplicitDataSequenceNumber() throws IOExceptio @Test public void testCommitManifestWithExistingEntriesWithoutFileSequenceNumber() throws IOException { - Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); DataFile file1 = newFile(50); DataFile file2 = newFile(50); @@ -393,7 +392,7 @@ public void testWriteManifestWithCompression() throws IOException { @Test public void testWriteDeleteManifestWithCompression() throws IOException { - Assume.assumeTrue("delete files are only written for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); validateManifestCompressionCodec(true); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..8edd55acb08f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -332,7 +332,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -343,7 +345,8 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -591,7 +594,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..5c3f29c75d52 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -240,7 +240,10 @@ private List writeManifestsForUnpartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); } @@ -270,7 +273,10 @@ private List writeManifestsForPartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); }); @@ -369,7 +375,9 @@ private static ManifestFile writeManifest( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) + StructType sparkType, + String compressionCodec, + Integer compressionLevel) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); @@ -384,7 +392,8 @@ private static ManifestFile writeManifest( Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); - ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); + ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); try { for (int index = startIndex; index < endIndex; index++) { @@ -409,7 +418,9 @@ private static MapPartitionsFunction toManifests( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) { + StructType sparkType, + String compressionCodec, + Integer compressionLevel) { return rows -> { List rowsAsList = Lists.newArrayList(rows); @@ -430,7 +441,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } else { int midIndex = rowsAsList.size() / 2; manifests.add( @@ -443,7 +456,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); manifests.add( writeManifest( rowsAsList, @@ -454,7 +469,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } return manifests.iterator();