Skip to content

Commit

Permalink
rm flink classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ggershinsky committed Dec 21, 2023
1 parent dca0196 commit 7df9db2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
Expand Down Expand Up @@ -100,18 +99,11 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), format);
}

@Override
public FileAppender<RowData> newAppender(
EncryptedOutputFile encryptedOutputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.forTable(table);

try {
switch (format) {
case AVRO:
return Avro.write(encryptedOutputFile)
return Avro.write(outputFile)
.createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema))
.setAll(props)
.schema(schema)
Expand All @@ -120,7 +112,7 @@ public FileAppender<RowData> newAppender(
.build();

case ORC:
return ORC.write(encryptedOutputFile)
return ORC.write(outputFile)
.createWriterFunc(
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema))
.setAll(props)
Expand All @@ -130,7 +122,7 @@ public FileAppender<RowData> newAppender(
.build();

case PARQUET:
return Parquet.write(encryptedOutputFile)
return Parquet.write(outputFile)
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType))
.setAll(props)
.metricsConfig(metricsConfig)
Expand All @@ -150,7 +142,7 @@ public FileAppender<RowData> newAppender(
public DataWriter<RowData> newDataWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
newAppender(file, format),
newAppender(file.encryptingOutputFile(), format),
format,
file.encryptingOutputFile().location(),
spec,
Expand All @@ -172,7 +164,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
try {
switch (format) {
case AVRO:
return Avro.writeDeletes(outputFile)
return Avro.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema()))
.withPartition(partition)
.overwrite()
Expand All @@ -185,7 +177,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(outputFile)
return ORC.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema))
.withPartition(partition)
Expand All @@ -199,7 +191,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
.buildEqualityWriter();

case PARQUET:
return Parquet.writeDeletes(outputFile)
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(
msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType))
.withPartition(partition)
Expand Down Expand Up @@ -228,7 +220,7 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(
try {
switch (format) {
case AVRO:
return Avro.writeDeletes(outputFile)
return Avro.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema()))
.withPartition(partition)
.overwrite()
Expand All @@ -242,7 +234,7 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(
case ORC:
RowType orcPosDeleteSchema =
FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
return ORC.writeDeletes(outputFile)
return ORC.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(
(iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema))
.withPartition(partition)
Expand All @@ -258,7 +250,7 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(
case PARQUET:
RowType flinkPosDeleteSchema =
FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
return Parquet.writeDeletes(outputFile)
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(
msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType))
.withPartition(partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.encryption.StandardKeyMetadata;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -183,12 +182,6 @@ private CloseableIterable<RowData> newParquetIterable(
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

if (task.file().keyMetadata() != null) {
StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(task.file().keyMetadata());
builder.withFileEncryptionKey(keyMetadata.encryptionKey());
builder.withAADPrefix(keyMetadata.aadPrefix());
}

return builder.build();
}

Expand Down

0 comments on commit 7df9db2

Please sign in to comment.