Skip to content

Commit

Permalink
Introduce ManifestWriter.Options and ManifestListWriter.Options
Browse files Browse the repository at this point in the history
... and have the static methods in ManifestFiles and ManifestLists
for writing use them.
  • Loading branch information
wypoon committed Oct 28, 2023
1 parent 8306e42 commit 041b7a0
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 125 deletions.
31 changes: 13 additions & 18 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return write(formatVersion, spec, outputFile, snapshotId, null, null);
return write(formatVersion, spec, outputFile, snapshotId, ManifestWriter.options());
}

/**
Expand All @@ -167,24 +167,20 @@ public static ManifestWriter<DataFile> write(
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param compressionCodec compression codec for the manifest file
* @param compressionLevel compression level of the compressionCodec
* @param options additional options for the manifest writer
* @return a manifest writer
*/
public static ManifestWriter<DataFile> write(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
String compressionCodec,
Integer compressionLevel) {
ManifestWriter.Options options) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId, options);
case 2:
return new ManifestWriter.V2Writer(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId, options);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -221,7 +217,8 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, null, null);
return writeDeleteManifest(
formatVersion, spec, outputFile, snapshotId, ManifestWriter.options());
}

/**
Expand All @@ -231,23 +228,20 @@ public static ManifestWriter<DeleteFile> writeDeleteManifest(
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param compressionCodec compression codec for the manifest file
* @param compressionLevel compression level of the compressionCodec
* @param options additional options for the manifest writer
* @return a manifest writer
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
String compressionCodec,
Integer compressionLevel) {
ManifestWriter.Options options) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
case 2:
return new ManifestWriter.V2DeleteWriter(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, options);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -367,8 +361,9 @@ private static ManifestFile copyManifestInternal(
reader.spec(),
outputFile,
snapshotId,
compressionCodec,
compressionLevel);
ManifestWriter.options()
.compressionCodec(compressionCodec)
.compressionLevel(compressionLevel));
boolean threw = true;
try {
for (ManifestEntry<DataFile> entry : reader.entries()) {
Expand Down
71 changes: 44 additions & 27 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@
abstract class ManifestListWriter implements FileAppender<ManifestFile> {
private final FileAppender<ManifestFile> writer;

private ManifestListWriter(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
this.writer = newAppender(file, meta, compressionCodec, compressionLevel);
private ManifestListWriter(OutputFile file, Map<String, String> meta, Options options) {
this.writer = newAppender(file, meta, options);
}

protected abstract ManifestFile prepare(ManifestFile manifest);

protected abstract FileAppender<ManifestFile> newAppender(
OutputFile file, Map<String, String> meta, String compressionCodec, Integer compressionLevel);
OutputFile file, Map<String, String> meta, Options options);

@Override
public void add(ManifestFile manifest) {
Expand Down Expand Up @@ -74,6 +70,35 @@ public long length() {
return writer.length();
}

public static Options options() {
return new Options();
}

static class Options {
private String compCodec;
private Integer compLevel;

private Options() {}

public Options compressionCodec(String codec) {
compCodec = codec;
return this;
}

public Options compressionLevel(Integer level) {
compLevel = level;
return this;
}

String compressionCodec() {
return compCodec;
}

Integer compressionLevel() {
return compLevel;
}
}

static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;

Expand All @@ -82,17 +107,15 @@ static class V2Writer extends ManifestListWriter {
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
String compressionCodec,
Integer compressionLevel) {
Options options) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "2"),
compressionCodec,
compressionLevel);
options);
this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
}

Expand All @@ -103,10 +126,10 @@ protected ManifestFile prepare(ManifestFile manifest) {

@Override
protected FileAppender<ManifestFile> newAppender(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
OutputFile file, Map<String, String> meta, Options options) {
String compressionCodec = options.compressionCodec();
Integer compressionLevel = options.compressionLevel();

try {
Avro.WriteBuilder builder =
Avro.write(file)
Expand All @@ -133,20 +156,14 @@ protected FileAppender<ManifestFile> newAppender(
static class V1Writer extends ManifestListWriter {
private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile();

V1Writer(
OutputFile snapshotFile,
long snapshotId,
Long parentSnapshotId,
String compressionCodec,
Integer compressionLevel) {
V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, Options options) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"format-version", "1"),
compressionCodec,
compressionLevel);
options);
}

@Override
Expand All @@ -159,10 +176,10 @@ protected ManifestFile prepare(ManifestFile manifest) {

@Override
protected FileAppender<ManifestFile> newAppender(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
OutputFile file, Map<String, String> meta, Options options) {
String compressionCodec = options.compressionCodec();
Integer compressionLevel = options.compressionLevel();

try {
Avro.WriteBuilder builder =
Avro.write(file)
Expand Down
19 changes: 9 additions & 10 deletions core/src/main/java/org/apache/iceberg/ManifestLists.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ static ManifestListWriter write(
Long parentSnapshotId,
long sequenceNumber) {
return write(
formatVersion, manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, null, null);
formatVersion,
manifestListFile,
snapshotId,
parentSnapshotId,
sequenceNumber,
ManifestListWriter.options());
}

static ManifestListWriter write(
Expand All @@ -71,24 +76,18 @@ static ManifestListWriter write(
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
String compressionCodec,
Integer compressionLevel) {
ManifestListWriter.Options options) {
switch (formatVersion) {
case 1:
Preconditions.checkArgument(
sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER,
"Invalid sequence number for v1 manifest list: %s",
sequenceNumber);
return new ManifestListWriter.V1Writer(
manifestListFile, snapshotId, parentSnapshotId, compressionCodec, compressionLevel);
manifestListFile, snapshotId, parentSnapshotId, options);
case 2:
return new ManifestListWriter.V2Writer(
manifestListFile,
snapshotId,
parentSnapshotId,
sequenceNumber,
compressionCodec,
compressionLevel);
manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, options);
}
throw new UnsupportedOperationException(
"Cannot write manifest list for table version: " + formatVersion);
Expand Down
Loading

0 comments on commit 041b7a0

Please sign in to comment.