Skip to content

Commit

Permalink
Address further feedback and port changes to Spark 3.5 as well.
Browse files Browse the repository at this point in the history
  • Loading branch information
wypoon committed Sep 21, 2023
1 parent 4a2a650 commit 8306e42
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 24 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
public class Avro {
private Avro() {}

public enum Codec {
private enum Codec {
UNCOMPRESSED,
SNAPPY,
GZIP,
Expand Down
8 changes: 3 additions & 5 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.avro.file.DataFileConstants;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -173,9 +171,9 @@ public class TableTestBase {
// the Avro metadata and checking for the Avro name in avro.codec.
static final Map<String, String> AVRO_CODEC_NAME_MAPPING =
ImmutableMap.<String, String>builder()
.put(Avro.Codec.UNCOMPRESSED.name(), DataFileConstants.NULL_CODEC)
.put(Avro.Codec.ZSTD.name(), DataFileConstants.ZSTANDARD_CODEC)
.put(Avro.Codec.GZIP.name(), DataFileConstants.DEFLATE_CODEC)
.put("uncompressed", "null")
.put("zstd", "zstandard")
.put("gzip", "deflate")
.build();

static final long EXAMPLE_SNAPSHOT_ID = 987134631982734L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.io.InputFile;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.assertj.core.api.Assumptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -48,7 +48,7 @@ public void testWriteManifestListWithCompression() throws IOException {

@Test
public void testWriteDeleteManifestListWithCompression() throws IOException {
Assume.assumeTrue("delete files are only written for format version > 1", formatVersion > 1);
Assumptions.assumeThat(formatVersion).isGreaterThan(1);
validateManifestListCompressionCodec(true);
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/test/java/org/apache/iceberg/TestManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -107,7 +106,7 @@ public void testManifestPartitionStats() throws IOException {

@Test
public void testWriteManifestWithSequenceNumber() throws IOException {
Assume.assumeTrue("sequence number is only valid for format version > 1", formatVersion > 1);
Assumptions.assumeThat(formatVersion).isGreaterThan(1);
File manifestFile = temp.newFile("manifest.avro");
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
Expand All @@ -132,7 +131,7 @@ public void testWriteManifestWithSequenceNumber() throws IOException {

@Test
public void testCommitManifestWithExplicitDataSequenceNumber() throws IOException {
Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1);
Assumptions.assumeThat(formatVersion).isGreaterThan(1);

DataFile file1 = newFile(50);
DataFile file2 = newFile(50);
Expand Down Expand Up @@ -177,7 +176,7 @@ public void testCommitManifestWithExplicitDataSequenceNumber() throws IOExceptio

@Test
public void testCommitManifestWithExistingEntriesWithoutFileSequenceNumber() throws IOException {
Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1);
Assumptions.assumeThat(formatVersion).isGreaterThan(1);

DataFile file1 = newFile(50);
DataFile file2 = newFile(50);
Expand Down Expand Up @@ -393,7 +392,7 @@ public void testWriteManifestWithCompression() throws IOException {

@Test
public void testWriteDeleteManifestWithCompression() throws IOException {
Assume.assumeTrue("delete files are only written for format version > 1", formatVersion > 1);
Assumptions.assumeThat(formatVersion).isGreaterThan(1);
validateManifestCompressionCodec(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,9 @@ private static Iterator<ManifestFile> buildManifest(
SerializableConfiguration conf,
PartitionSpec spec,
String basePath,
Iterator<Tuple2<String, DataFile>> fileTuples) {
Iterator<Tuple2<String, DataFile>> fileTuples,
String compressionCodec,
Integer compressionLevel) {
if (fileTuples.hasNext()) {
FileIO io = new HadoopFileIO(conf.get());
TaskContext ctx = TaskContext.get();
Expand All @@ -343,7 +345,8 @@ private static Iterator<ManifestFile> buildManifest(
Path location = new Path(basePath, suffix);
String outputPath = FileFormat.AVRO.addExtension(location.toString());
OutputFile outputFile = io.newOutputFile(outputPath);
ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
ManifestWriter<DataFile> writer =
ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel);

try (ManifestWriter<DataFile> writerRef = writer) {
fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
Expand Down Expand Up @@ -591,7 +594,15 @@ public static void importSparkPartitions(
.orderBy(col("_1"))
.mapPartitions(
(MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple),
fileTuple ->
buildManifest(
serializableConf,
spec,
stagingDir,
fileTuple,
targetTable.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)),
Encoders.javaSerialization(ManifestFile.class))
.collectAsList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
formatVersion,
combinedPartitionType,
spec,
sparkType),
sparkType,
table.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)),
manifestEncoder)
.collectAsList();
}
Expand Down Expand Up @@ -270,7 +273,10 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
formatVersion,
combinedPartitionType,
spec,
sparkType),
sparkType,
table.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)),
manifestEncoder)
.collectAsList();
});
Expand Down Expand Up @@ -369,7 +375,9 @@ private static ManifestFile writeManifest(
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
StructType sparkType,
String compressionCodec,
Integer compressionLevel)
throws IOException {

String manifestName = "optimized-m-" + UUID.randomUUID();
Expand All @@ -384,7 +392,8 @@ private static ManifestFile writeManifest(
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);

ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel);

try {
for (int index = startIndex; index < endIndex; index++) {
Expand All @@ -409,7 +418,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
StructType sparkType,
String compressionCodec,
Integer compressionLevel) {

return rows -> {
List<Row> rowsAsList = Lists.newArrayList(rows);
Expand All @@ -430,7 +441,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
format,
combinedPartitionType,
spec,
sparkType));
sparkType,
compressionCodec,
compressionLevel));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
Expand All @@ -443,7 +456,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
format,
combinedPartitionType,
spec,
sparkType));
sparkType,
compressionCodec,
compressionLevel));
manifests.add(
writeManifest(
rowsAsList,
Expand All @@ -454,7 +469,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
format,
combinedPartitionType,
spec,
sparkType));
sparkType,
compressionCodec,
compressionLevel));
}

return manifests.iterator();
Expand Down

0 comments on commit 8306e42

Please sign in to comment.