Skip to content

Commit

Permalink
Update Spark 3.4.
Browse files Browse the repository at this point in the history
  • Loading branch information
wypoon committed Sep 20, 2023
1 parent 88613cd commit b0caadf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,9 @@ private static Iterator<ManifestFile> buildManifest(
SerializableConfiguration conf,
PartitionSpec spec,
String basePath,
Iterator<Tuple2<String, DataFile>> fileTuples) {
Iterator<Tuple2<String, DataFile>> fileTuples,
String compressionCodec,
Integer compressionLevel) {
if (fileTuples.hasNext()) {
FileIO io = new HadoopFileIO(conf.get());
TaskContext ctx = TaskContext.get();
Expand All @@ -343,7 +345,8 @@ private static Iterator<ManifestFile> buildManifest(
Path location = new Path(basePath, suffix);
String outputPath = FileFormat.AVRO.addExtension(location.toString());
OutputFile outputFile = io.newOutputFile(outputPath);
ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
ManifestWriter<DataFile> writer =
ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel);

try (ManifestWriter<DataFile> writerRef = writer) {
fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
Expand Down Expand Up @@ -591,7 +594,15 @@ public static void importSparkPartitions(
.orderBy(col("_1"))
.mapPartitions(
(MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple),
fileTuple ->
buildManifest(
serializableConf,
spec,
stagingDir,
fileTuple,
targetTable.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)),
Encoders.javaSerialization(ManifestFile.class))
.collectAsList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
formatVersion,
combinedPartitionType,
spec,
sparkType),
sparkType,
table.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)),
manifestEncoder)
.collectAsList();
}
Expand Down Expand Up @@ -270,7 +273,10 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
formatVersion,
combinedPartitionType,
spec,
sparkType),
sparkType,
table.properties().get(TableProperties.AVRO_COMPRESSION),
PropertyUtil.propertyAsNullableInt(
table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)),
manifestEncoder)
.collectAsList();
});
Expand Down Expand Up @@ -369,7 +375,9 @@ private static ManifestFile writeManifest(
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
StructType sparkType,
String compressionCodec,
Integer compressionLevel)
throws IOException {

String manifestName = "optimized-m-" + UUID.randomUUID();
Expand All @@ -384,7 +392,8 @@ private static ManifestFile writeManifest(
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);

ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
ManifestWriter<DataFile> writer =
ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel);

try {
for (int index = startIndex; index < endIndex; index++) {
Expand All @@ -409,7 +418,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {
StructType sparkType,
String compressionCodec,
Integer compressionLevel) {

return rows -> {
List<Row> rowsAsList = Lists.newArrayList(rows);
Expand All @@ -430,7 +441,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
format,
combinedPartitionType,
spec,
sparkType));
sparkType,
compressionCodec,
compressionLevel));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
Expand All @@ -443,7 +456,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
format,
combinedPartitionType,
spec,
sparkType));
sparkType,
compressionCodec,
compressionLevel));
manifests.add(
writeManifest(
rowsAsList,
Expand All @@ -454,7 +469,9 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
format,
combinedPartitionType,
spec,
sparkType));
sparkType,
compressionCodec,
compressionLevel));
}

return manifests.iterator();
Expand Down

0 comments on commit b0caadf

Please sign in to comment.