From 7df9db2a3bc3bc7dcb17e6bf2c6321983bf32c24 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 21 Dec 2023 17:00:39 +0200 Subject: [PATCH] rm flink classes --- .../flink/sink/FlinkAppenderFactory.java | 28 +++++++------------ .../source/RowDataFileScanTaskReader.java | 7 ----- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index fcbf0b19ba52..b6f1392d1562 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -35,7 +35,6 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -100,18 +99,11 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), format); - } - - @Override - public FileAppender newAppender( - EncryptedOutputFile encryptedOutputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.forTable(table); - try { switch (format) { case AVRO: - return Avro.write(encryptedOutputFile) + return Avro.write(outputFile) .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) .setAll(props) .schema(schema) @@ -120,7 +112,7 @@ public FileAppender newAppender( .build(); case ORC: - return ORC.write(encryptedOutputFile) + return ORC.write(outputFile) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) .setAll(props) @@ -130,7 +122,7 @@ public FileAppender newAppender( .build(); case PARQUET: - return Parquet.write(encryptedOutputFile) + return Parquet.write(outputFile) .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) .setAll(props) .metricsConfig(metricsConfig) @@ -150,7 +142,7 @@ public FileAppender newAppender( public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file, format), + newAppender(file.encryptingOutputFile(), format), format, file.encryptingOutputFile().location(), spec, @@ -172,7 +164,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case AVRO: - return Avro.writeDeletes(outputFile) + return Avro.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema())) .withPartition(partition) .overwrite() @@ -185,7 +177,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(outputFile) + return ORC.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) .withPartition(partition) @@ -199,7 +191,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(outputFile) + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) .withPartition(partition) @@ -228,7 +220,7 @@ public PositionDeleteWriter newPosDeleteWriter( try { switch (format) { case AVRO: - return Avro.writeDeletes(outputFile) + return Avro.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema())) .withPartition(partition) .overwrite() @@ -242,7 +234,7 @@ public PositionDeleteWriter newPosDeleteWriter( case ORC: RowType orcPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return ORC.writeDeletes(outputFile) + return ORC.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema)) .withPartition(partition) @@ -258,7 +250,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(outputFile) + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType)) .withPartition(partition) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index e719ed08ac8a..88364f4e87b1 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -30,7 +30,6 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.encryption.InputFilesDecryptor; -import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -183,12 +182,6 @@ private CloseableIterable newParquetIterable( builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } - if (task.file().keyMetadata() != null) { - StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(task.file().keyMetadata()); - builder.withFileEncryptionKey(keyMetadata.encryptionKey()); - builder.withAADPrefix(keyMetadata.aadPrefix()); - } - return builder.build(); }