diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 1686342c776d..e1abd644846f 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -37,4 +37,9 @@ public interface EncryptedOutputFile { * #encryptingOutputFile()}. */ EncryptionKeyMetadata keyMetadata(); + + /** Underlying output file for native encryption. */ + default OutputFile rawOutputFile() { + return null; + }; } diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java index 1ce1c337a809..d8088d7c441b 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java @@ -49,4 +49,12 @@ static EncryptionKeyMetadata empty() { ByteBuffer buffer(); EncryptionKeyMetadata copy(); + + default ByteBuffer encryptionKey() { + return null; + } + + default ByteBuffer aadPrefix() { + return null; + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java index c0fc41ca1385..912104a5305d 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java @@ -57,5 +57,9 @@ public static EncryptedOutputFile encryptedOutput( encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata)); } + public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); + } + private EncryptedFiles() {} } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java new file mode 100644 index 000000000000..aa1e1509c09e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; +import org.apache.iceberg.common.DynConstructors; + +public class EncryptionUtil { + + private EncryptionUtil() {} + + public static EncryptionKeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) { + return KeyMetadata.parse(metadataBuffer); + } + + public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) { + return new KeyMetadata(key, aadPrefix); + } + + static KeyManagementClient createKmsClient(String kmsImpl) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsImpl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize KeyManagementClient, missing no-arg constructor for class %s", + kmsImpl), + e); + } + + try { + return ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize kms client, %s does not implement KeyManagementClient interface", + kmsImpl), + e); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index 4d8d8aa7aff9..1ba42adc8d66 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -18,18 +18,22 @@ */ package org.apache.iceberg.encryption; -import java.nio.ByteBuffer; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PlaintextEncryptionManager implements EncryptionManager { + private static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); + public static EncryptionManager instance() { + return INSTANCE; + } + @Override public InputFile decrypt(EncryptedInputFile encrypted) { - if (encrypted.keyMetadata().buffer() != null) { + if (encrypted.keyMetadata() != null && encrypted.keyMetadata().buffer() != null) { LOG.warn( "File encryption key metadata is present, but currently using PlaintextEncryptionManager."); } @@ -38,6 +42,6 @@ public InputFile decrypt(EncryptedInputFile encrypted) { @Override public EncryptedOutputFile encrypt(OutputFile rawOutput) { - return EncryptedFiles.encryptedOutput(rawOutput, (ByteBuffer) null); + return EncryptedFiles.encryptedOutput(rawOutput, EncryptionKeyMetadata.empty()); } } diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 59b0b4b3bf6a..82b531c225c2 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -40,6 +40,10 @@ public interface FileAppenderFactory { */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + default FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + return newAppender(outputFile.encryptingOutputFile(), fileFormat); + } + /** * Create a new {@link DataWriter}. * diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 976b98b0a9fe..9092a82c484c 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -93,7 +93,6 @@ protected BaseFileWriterFactory( @Override public DataWriter newDataWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); @@ -102,7 +101,7 @@ public DataWriter newDataWriter( switch (dataFileFormat) { case AVRO: Avro.DataWriteBuilder avroBuilder = - Avro.writeData(outputFile) + Avro.writeData(file.encryptingOutputFile()) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -118,13 +117,15 @@ public DataWriter newDataWriter( case PARQUET: Parquet.DataWriteBuilder parquetBuilder = - Parquet.writeData(outputFile) + Parquet.writeData(file.rawOutputFile()) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) .withSpec(spec) .withPartition(partition) .withKeyMetadata(keyMetadata) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()) .withSortOrder(dataSortOrder) .overwrite(); @@ -134,7 +135,7 @@ public DataWriter newDataWriter( case ORC: ORC.DataWriteBuilder orcBuilder = - ORC.writeData(outputFile) + ORC.writeData(file.encryptingOutputFile()) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -194,6 +195,8 @@ public EqualityDeleteWriter newEqualityDeleteWriter( .withSpec(spec) .withPartition(partition) .withKeyMetadata(keyMetadata) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()) .withSortOrder(equalityDeleteSortOrder) .overwrite(); @@ -261,6 +264,8 @@ public PositionDeleteWriter newPositionDeleteWriter( .withSpec(spec) .withPartition(partition) .withKeyMetadata(keyMetadata) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()) .overwrite(); configurePositionDelete(parquetBuilder); diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a7979fd2ed3e..8f5ff6ecf127 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -36,6 +36,8 @@ import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -293,6 +295,13 @@ private CloseableIterable openDeletes(DeleteFile deleteFile, Schema dele builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); } + if (deleteFile.keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(deleteFile.keyMetadata()); + builder.withFileEncryptionKey(keyMetadata.encryptionKey()); + builder.withAADPrefix(keyMetadata.aadPrefix()); + } + return builder.build(); case ORC: diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..0bb081e031d8 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -32,7 +32,9 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -84,11 +86,17 @@ public GenericAppenderFactory setAll(Map properties) { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + return newAppender( + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.empty()), fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (fileFormat) { case AVRO: - return Avro.write(outputFile) + return Avro.write(outputFile.encryptingOutputFile()) .schema(schema) .createWriterFunc(DataWriter::create) .metricsConfig(metricsConfig) @@ -97,16 +105,18 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); case PARQUET: - return Parquet.write(outputFile) + return Parquet.write(outputFile.rawOutputFile()) .schema(schema) .createWriterFunc(GenericParquetWriter::buildWriter) .setAll(config) .metricsConfig(metricsConfig) .overwrite() + .withFileEncryptionKey(outputFile.keyMetadata().encryptionKey()) + .withAADPrefix(outputFile.keyMetadata().aadPrefix()) .build(); case ORC: - return ORC.write(outputFile) + return ORC.write(outputFile.encryptingOutputFile()) .schema(schema) .createWriterFunc(GenericOrcWriter::buildWriter) .setAll(config) @@ -127,7 +137,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo public org.apache.iceberg.io.DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new org.apache.iceberg.io.DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -183,6 +193,8 @@ public EqualityDeleteWriter newEqDeleteWriter( .rowSchema(eqDeleteRowSchema) .withSpec(spec) .withKeyMetadata(file.keyMetadata()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()) .equalityFieldIds(equalityFieldIds) .buildEqualityWriter(); @@ -233,6 +245,8 @@ public PositionDeleteWriter newPosDeleteWriter( .rowSchema(posDeleteRowSchema) .withSpec(spec) .withKeyMetadata(file.keyMetadata()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()) .buildPositionWriter(); default: diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index 3637bf00bd58..5733cda1910b 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -29,6 +29,8 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -125,6 +127,13 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject parquet.reuseContainers(); } + if (task.file().keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(task.file().keyMetadata()); + parquet.withFileEncryptionKey(keyMetadata.encryptionKey()); + parquet.withAADPrefix(keyMetadata.aadPrefix()); + } + return parquet.build(); case ORC: diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index b6f1392d1562..1b2f65bb83ba 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -34,7 +34,9 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -99,11 +101,17 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender( + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.empty()), format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: - return Avro.write(outputFile) + return Avro.write(outputFile.encryptingOutputFile()) .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) .setAll(props) .schema(schema) @@ -112,7 +120,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .build(); case ORC: - return ORC.write(outputFile) + return ORC.write(outputFile.encryptingOutputFile()) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) .setAll(props) @@ -122,12 +130,14 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .build(); case PARQUET: - return Parquet.write(outputFile) + return Parquet.write(outputFile.rawOutputFile()) .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) .setAll(props) .metricsConfig(metricsConfig) .schema(schema) .overwrite() + .withFileEncryptionKey(outputFile.keyMetadata().encryptionKey()) + .withAADPrefix(outputFile.keyMetadata().aadPrefix()) .build(); default: @@ -142,7 +152,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -201,6 +211,8 @@ public EqualityDeleteWriter newEqDeleteWriter( .rowSchema(eqDeleteRowSchema) .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) + .withFileEncryptionKey(outputFile.keyMetadata().encryptionKey()) + .withAADPrefix(outputFile.keyMetadata().aadPrefix()) .equalityFieldIds(equalityFieldIds) .buildEqualityWriter(); @@ -260,6 +272,8 @@ public PositionDeleteWriter newPosDeleteWriter( .rowSchema(posDeleteRowSchema) .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) + .withFileEncryptionKey(outputFile.keyMetadata().encryptionKey()) + .withAADPrefix(outputFile.keyMetadata().aadPrefix()) .transformPaths(path -> StringData.fromString(path.toString())) .buildPositionWriter(); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5fada27d5471..18c2f7c49818 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -28,6 +28,8 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; @@ -161,6 +163,13 @@ private CloseableIterable newParquetIterable( builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + if (task.file().keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(task.file().keyMetadata()); + builder.withFileEncryptionKey(keyMetadata.encryptionKey()); + builder.withAADPrefix(keyMetadata.aadPrefix()); + } + return builder.build(); } diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index a95454b8b0ee..b2bfa1399ca7 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -56,8 +56,9 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -302,10 +303,7 @@ public void close() throws IOException { private CloseableIterable openTask(FileScanTask currentTask, Schema readSchema) { DataFile file = currentTask.file(); - InputFile inputFile = - encryptionManager.decrypt( - EncryptedFiles.encryptedInput( - io.newInputFile(file.path().toString()), file.keyMetadata())); + InputFile inputFile = io.newInputFile(file.path().toString()); CloseableIterable iterable; switch (file.format()) { @@ -419,6 +417,14 @@ private CloseableIterable newParquetIterable( if (nameMapping != null) { parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + + if (task.file().keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(task.file().keyMetadata()); + parquetReadBuilder.withFileEncryptionKey(keyMetadata.encryptionKey()); + parquetReadBuilder.withAADPrefix(keyMetadata.aadPrefix()); + } + parquetReadBuilder.createReaderFunc( fileSchema -> GenericParquetReaders.buildReader( diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index d206bb8e2b5b..58d4cd371af3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.nio.ByteBuffer; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -26,11 +27,14 @@ import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -52,6 +56,7 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( InputFile inputFile, + ByteBuffer keyMetadata, FileFormat format, long start, long length, @@ -60,9 +65,12 @@ protected CloseableIterable newBatchIterable( SparkDeleteFilter deleteFilter) { switch (format) { case PARQUET: - return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); + return newParquetIterable( + inputFile, keyMetadata, start, length, residual, idToConstant, deleteFilter); case ORC: + Preconditions.checkState( + keyMetadata == null, "Encryption currently not supported with ORC format"); return newOrcIterable(inputFile, start, length, residual, idToConstant); default: @@ -73,6 +81,7 @@ protected CloseableIterable newBatchIterable( private CloseableIterable newParquetIterable( InputFile inputFile, + ByteBuffer encryptionKeyMetadata, long start, long length, Expression residual, @@ -81,6 +90,14 @@ private CloseableIterable newParquetIterable( // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); + ByteBuffer fileEncryptionKey = null; + ByteBuffer aadPrefix = null; + if (encryptionKeyMetadata != null) { + EncryptionKeyMetadata keyMetadata = EncryptionUtil.parseKeyMetadata(encryptionKeyMetadata); + fileEncryptionKey = keyMetadata.encryptionKey(); + aadPrefix = keyMetadata.aadPrefix(); + } + return Parquet.read(inputFile) .project(requiredSchema) .split(start, length) @@ -96,6 +113,8 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .withFileEncryptionKey(fileEncryptionKey) + .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 608f0df0075d..19fbd7587e70 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -26,6 +27,8 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -46,6 +49,7 @@ abstract class BaseRowReader extends BaseReader newIterable( InputFile file, + ByteBuffer encryptionKeyMetadata, FileFormat format, long start, long length, @@ -54,7 +58,8 @@ protected CloseableIterable newIterable( Map idToConstant) { switch (format) { case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); + return newParquetIterable( + file, encryptionKeyMetadata, start, length, residual, projection, idToConstant); case AVRO: return newAvroIterable(file, start, length, projection, idToConstant); @@ -80,11 +85,20 @@ private CloseableIterable newAvroIterable( private CloseableIterable newParquetIterable( InputFile file, + ByteBuffer encryptionKeyMetadata, long start, long length, Expression residual, Schema readSchema, Map idToConstant) { + ByteBuffer fileEncryptionKey = null; + ByteBuffer aadPrefix = null; + if (encryptionKeyMetadata != null) { + EncryptionKeyMetadata keyMetadata = EncryptionUtil.parseKeyMetadata(encryptionKeyMetadata); + fileEncryptionKey = keyMetadata.encryptionKey(); + aadPrefix = keyMetadata.aadPrefix(); + } + return Parquet.read(file) .reuseContainers() .split(start, length) @@ -94,6 +108,8 @@ private CloseableIterable newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) + .withFileEncryptionKey(fileEncryptionKey) + .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index e087fde3f8db..00a4e2bcc8b2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -97,6 +97,7 @@ protected CloseableIterator open(FileScanTask task) { return newBatchIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 41d8f62c57cc..a7b45b23df6b 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -125,6 +125,7 @@ private CloseableIterable rows(ContentScanTask task, Sche Preconditions.checkNotNull(location, "Could not find InputFile"); return newIterable( location, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 3729df930cfe..4588ede374df 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -102,6 +102,7 @@ protected CloseableIterable open( inputFile, "Could not find InputFile associated with FileScanTask"); return newIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 6372edde0782..b89e8e52ff36 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; @@ -161,21 +162,28 @@ private StructType lazyPosDeleteSparkType() { } @Override - public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(EncryptedFiles.plainAsEncryptedOutput(outputFile), format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { case PARQUET: - return Parquet.write(file) + return Parquet.write(file.rawOutputFile()) .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType)) .setAll(properties) .metricsConfig(metricsConfig) .schema(writeSchema) .overwrite() + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()) .build(); case AVRO: - return Avro.write(file) + return Avro.write(file.encryptingOutputFile()) .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) .setAll(properties) .schema(writeSchema) @@ -183,7 +191,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor .build(); case ORC: - return ORC.write(file) + return ORC.write(file.encryptingOutputFile()) .createWriterFunc(SparkOrcWriter::new) .setAll(properties) .metricsConfig(metricsConfig) @@ -203,7 +211,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -233,6 +241,8 @@ public EqualityDeleteWriter newEqDeleteWriter( .withPartition(partition) .equalityFieldIds(equalityFieldIds) .withKeyMetadata(file.keyMetadata()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()) .buildEqualityWriter(); case AVRO: @@ -282,6 +292,8 @@ public PositionDeleteWriter newPosDeleteWriter( .withSpec(spec) .withPartition(partition) .withKeyMetadata(file.keyMetadata()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()) .transformPaths(path -> UTF8String.fromString(path.toString())) .buildPositionWriter();