diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 300c88d18862..1686342c776d 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -37,9 +37,4 @@ public interface EncryptedOutputFile { * #encryptingOutputFile()}. */ EncryptionKeyMetadata keyMetadata(); - - /** Underlying output file for native encryption. */ - default OutputFile plainOutputFile() { - throw new UnsupportedOperationException("Not implemented"); - } } diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index afc502e20d96..6aa3241c3660 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -55,6 +55,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -92,6 +93,13 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Avro encryption is not supported"); + return new WriteBuilder(file.encryptingOutputFile()); + } + public static class WriteBuilder { private final OutputFile file; private final Map config = Maps.newHashMap(); @@ -273,6 +281,13 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Avro encryption is not supported"); + return new DataWriteBuilder(file.encryptingOutputFile()); + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -369,6 +384,13 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Avro encryption is not supported"); + return new DeleteWriteBuilder(file.encryptingOutputFile()); + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index ad1deecb8da1..e2cf98bf767f 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -20,9 +20,9 @@ import java.util.Map; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.PropertyUtil; @@ -80,17 +80,6 @@ public static EncryptionManager createEncryptionManager( return PlaintextEncryptionManager.instance(); } - String fileFormat = - PropertyUtil.propertyAsString( - tableProperties, - TableProperties.DEFAULT_FILE_FORMAT, - TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); - - if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { - throw new UnsupportedOperationException( - "Iceberg encryption currently supports only parquet format for data files"); - } - int dataKeyLength = PropertyUtil.propertyAsInt( tableProperties, @@ -104,4 +93,8 @@ public static EncryptionManager createEncryptionManager( return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); } + + public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java new file mode 100644 index 000000000000..15ead6bac2b9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import org.apache.iceberg.io.InputFile; + +/** An {@link EncryptedInputFile} that can be used for format-native encryption. */ +public interface NativeEncryptionInputFile extends EncryptedInputFile, InputFile { + @Override + NativeEncryptionKeyMetadata keyMetadata(); +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java new file mode 100644 index 000000000000..c2ed9d564d1e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; + +/** {@link EncryptionKeyMetadata} for use with format-native encryption. */ +public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { + /** Encryption key as a {@link ByteBuffer} */ + ByteBuffer encryptionKey(); + + /** Additional authentication data as a {@link ByteBuffer} */ + ByteBuffer aadPrefix(); +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java new file mode 100644 index 000000000000..0d0d5da8a677 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import org.apache.iceberg.io.OutputFile; + +/** An {@link EncryptedOutputFile} that can be used for format-native encryption. */ +public interface NativeEncryptionOutputFile extends EncryptedOutputFile { + @Override + NativeEncryptionKeyMetadata keyMetadata(); + + /** An {@link OutputFile} instance for the underlying (plaintext) output stream. */ + OutputFile plainOutputFile(); +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 63f89e7661b3..185f4d6f81bb 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -54,19 +54,18 @@ public StandardEncryptionManager( } @Override - public EncryptedOutputFile encrypt(OutputFile plainOutput) { + public NativeEncryptionOutputFile encrypt(OutputFile plainOutput) { return new StandardEncryptedOutputFile(plainOutput, dataKeyLength); } @Override - public InputFile decrypt(EncryptedInputFile encrypted) { + public NativeEncryptionInputFile decrypt(EncryptedInputFile encrypted) { // this input file will lazily parse key metadata in case the file is not an AES GCM stream. return new StandardDecryptedInputFile(encrypted); } @Override public Iterable decrypt(Iterable encrypted) { - // Bulk decrypt is only applied to data files. Returning source input files for parquet. return Iterables.transform(encrypted, this::decrypt); } @@ -96,7 +95,7 @@ public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } - private class StandardEncryptedOutputFile implements EncryptedOutputFile { + private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { private final OutputFile plainOutputFile; private final int dataKeyLength; private StandardKeyMetadata lazyKeyMetadata = null; @@ -141,7 +140,7 @@ public OutputFile plainOutputFile() { } } - private static class StandardDecryptedInputFile implements InputFile { + private static class StandardDecryptedInputFile implements NativeEncryptionInputFile { private final EncryptedInputFile encryptedInputFile; private StandardKeyMetadata lazyKeyMetadata = null; private AesGcmInputFile lazyDecryptedInputFile = null; @@ -150,7 +149,13 @@ private StandardDecryptedInputFile(EncryptedInputFile encryptedInputFile) { this.encryptedInputFile = encryptedInputFile; } - private StandardKeyMetadata keyMetadata() { + @Override + public InputFile encryptedInputFile() { + return encryptedInputFile.encryptedInputFile(); + } + + @Override + public StandardKeyMetadata keyMetadata() { if (null == lazyKeyMetadata) { this.lazyKeyMetadata = StandardKeyMetadata.castOrParse(encryptedInputFile.keyMetadata()); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 2ac70aebc316..08466f75fe21 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; -class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord { +class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord { private static final byte V1 = 1; private static final Schema SCHEMA_V1 = new Schema( @@ -73,11 +73,13 @@ static Map supportedAvroSchemaVersions() { return avroSchemaVersions; } - ByteBuffer encryptionKey() { + @Override + public ByteBuffer encryptionKey() { return encryptionKey; } - ByteBuffer aadPrefix() { + @Override + public ByteBuffer aadPrefix() { return aadPrefix; } diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 59b0b4b3bf6a..1b320cae64e0 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -40,6 +40,17 @@ public interface FileAppenderFactory { */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + /** + * Create a new {@link FileAppender}. + * + * @param outputFile an EncryptedOutputFile used to create an output stream. + * @param fileFormat File format. + * @return a newly created {@link FileAppender} + */ + default FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + return newAppender(outputFile.encryptingOutputFile(), fileFormat); + } + /** * Create a new {@link DataWriter}. * diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 976b98b0a9fe..9e37c723be93 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -35,7 +35,6 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileWriterFactory; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -93,7 +92,6 @@ protected BaseFileWriterFactory( @Override public DataWriter newDataWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); @@ -102,7 +100,7 @@ public DataWriter newDataWriter( switch (dataFileFormat) { case AVRO: Avro.DataWriteBuilder avroBuilder = - Avro.writeData(outputFile) + Avro.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -118,7 +116,7 @@ public DataWriter newDataWriter( case PARQUET: Parquet.DataWriteBuilder parquetBuilder = - Parquet.writeData(outputFile) + Parquet.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -134,7 +132,7 @@ public DataWriter newDataWriter( case ORC: ORC.DataWriteBuilder orcBuilder = - ORC.writeData(outputFile) + ORC.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -160,7 +158,6 @@ public DataWriter newDataWriter( @Override public EqualityDeleteWriter newEqualityDeleteWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); @@ -169,7 +166,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( switch (deleteFileFormat) { case AVRO: Avro.DeleteWriteBuilder avroBuilder = - Avro.writeDeletes(outputFile) + Avro.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -186,7 +183,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( case PARQUET: Parquet.DeleteWriteBuilder parquetBuilder = - Parquet.writeDeletes(outputFile) + Parquet.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -203,7 +200,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( case ORC: ORC.DeleteWriteBuilder orcBuilder = - ORC.writeDeletes(outputFile) + ORC.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -230,7 +227,6 @@ public EqualityDeleteWriter newEqualityDeleteWriter( @Override public PositionDeleteWriter newPositionDeleteWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); @@ -239,7 +235,7 @@ public PositionDeleteWriter newPositionDeleteWriter( switch (deleteFileFormat) { case AVRO: Avro.DeleteWriteBuilder avroBuilder = - Avro.writeDeletes(outputFile) + Avro.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) @@ -254,7 +250,7 @@ public PositionDeleteWriter newPositionDeleteWriter( case PARQUET: Parquet.DeleteWriteBuilder parquetBuilder = - Parquet.writeDeletes(outputFile) + Parquet.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) @@ -269,7 +265,7 @@ public PositionDeleteWriter newPositionDeleteWriter( case ORC: ORC.DeleteWriteBuilder orcBuilder = - ORC.writeDeletes(outputFile) + ORC.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..abad5d261d51 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -33,6 +33,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -84,11 +85,17 @@ public GenericAppenderFactory setAll(Map properties) { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), fileFormat); + } + + @Override + public FileAppender newAppender( + EncryptedOutputFile encryptedOutputFile, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (fileFormat) { case AVRO: - return Avro.write(outputFile) + return Avro.write(encryptedOutputFile) .schema(schema) .createWriterFunc(DataWriter::create) .metricsConfig(metricsConfig) @@ -97,7 +104,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); case PARQUET: - return Parquet.write(outputFile) + return Parquet.write(encryptedOutputFile) .schema(schema) .createWriterFunc(GenericParquetWriter::buildWriter) .setAll(config) @@ -106,7 +113,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); case ORC: - return ORC.write(outputFile) + return ORC.write(encryptedOutputFile) .schema(schema) .createWriterFunc(GenericOrcWriter::buildWriter) .setAll(config) @@ -127,7 +134,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo public org.apache.iceberg.io.DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new org.apache.iceberg.io.DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -146,10 +153,11 @@ public EqualityDeleteWriter newEqDeleteWriter( "Equality delete row schema shouldn't be null when creating equality-delete writer"); MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { switch (format) { case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(DataWriter::create) .withPartition(partition) .overwrite() @@ -161,7 +169,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(GenericOrcWriter::buildWriter) .withPartition(partition) .overwrite() @@ -174,7 +182,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc(GenericParquetWriter::buildWriter) .withPartition(partition) .overwrite() @@ -199,10 +207,11 @@ public EqualityDeleteWriter newEqDeleteWriter( public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { switch (format) { case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(DataWriter::create) .withPartition(partition) .overwrite() @@ -213,7 +222,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(GenericOrcWriter::buildWriter) .withPartition(partition) .overwrite() @@ -224,7 +233,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc(GenericParquetWriter::buildWriter) .withPartition(partition) .overwrite() diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index b6f1392d1562..eacef58a8d5d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -35,6 +35,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -99,6 +100,11 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { @@ -142,7 +148,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -164,7 +170,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case AVRO: - return Avro.writeDeletes(outputFile.encryptingOutputFile()) + return Avro.writeDeletes(outputFile) .createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema())) .withPartition(partition) .overwrite() @@ -177,7 +183,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(outputFile.encryptingOutputFile()) + return ORC.writeDeletes(outputFile) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) .withPartition(partition) @@ -191,7 +197,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + return Parquet.writeDeletes(outputFile) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) .withPartition(partition) @@ -220,7 +226,7 @@ public PositionDeleteWriter newPosDeleteWriter( try { switch (format) { case AVRO: - return Avro.writeDeletes(outputFile.encryptingOutputFile()) + return Avro.writeDeletes(outputFile) .createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema())) .withPartition(partition) .overwrite() @@ -234,7 +240,7 @@ public PositionDeleteWriter newPosDeleteWriter( case ORC: RowType orcPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return ORC.writeDeletes(outputFile.encryptingOutputFile()) + return ORC.writeDeletes(outputFile) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema)) .withPartition(partition) @@ -250,7 +256,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + return Parquet.writeDeletes(outputFile) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType)) .withPartition(partition) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 89cd1ad4362a..7d1405bbd45b 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -63,7 +63,10 @@ import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionInputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; @@ -101,6 +104,12 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + Preconditions.checkState( + !(file instanceof NativeEncryptionOutputFile), "Native ORC encryption is not supported"); + return new WriteBuilder(file.encryptingOutputFile()); + } + public static class WriteBuilder { private final OutputFile file; private final Configuration conf; @@ -382,6 +391,12 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + Preconditions.checkState( + !(file instanceof NativeEncryptionOutputFile), "Native ORC encryption is not supported"); + return new DataWriteBuilder(file.encryptingOutputFile()); + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -479,6 +494,12 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + Preconditions.checkState( + !(file instanceof NativeEncryptionOutputFile), "Native ORC encryption is not supported"); + return new DeleteWriteBuilder(file.encryptingOutputFile()); + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -657,6 +678,9 @@ public PositionDeleteWriter buildPositionWriter() { } public static ReadBuilder read(InputFile file) { + Preconditions.checkState( + !(file instanceof NativeEncryptionInputFile), "Native ORC encryption is not supported"); + return new ReadBuilder(file); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d240c84b9e4d..f80810acdd7f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -71,7 +71,10 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionInputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; @@ -125,6 +128,17 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return write(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return write(file.encryptingOutputFile()); + } + } + public static class WriteBuilder { private final OutputFile file; private final Configuration conf; @@ -608,6 +622,17 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return writeData(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return writeData(file.encryptingOutputFile()); + } + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -715,6 +740,17 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return writeDeletes(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return writeDeletes(file.encryptingOutputFile()); + } + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -957,7 +993,14 @@ protected WriteSupport getWriteSupport(Configuration configuration) { } public static ReadBuilder read(InputFile file) { - return new ReadBuilder(file); + if (file instanceof NativeEncryptionInputFile) { + NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file; + return new ReadBuilder(nativeFile.encryptedInputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return new ReadBuilder(file); + } } public static class ReadBuilder { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 6372edde0782..9df12fc060ae 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -31,6 +31,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -162,6 +163,11 @@ private StructType lazyPosDeleteSparkType() { @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(file), fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { @@ -203,7 +209,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -224,7 +230,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType)) .overwrite() @@ -236,7 +242,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(ignored -> new SparkAvroWriter(lazyEqDeleteSparkType())) .overwrite() .rowSchema(eqDeleteRowSchema) @@ -247,7 +253,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .overwrite() .rowSchema(eqDeleteRowSchema) @@ -274,7 +280,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: StructType sparkPosDeleteSchema = SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType)) .overwrite() @@ -286,7 +292,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(ignored -> new SparkAvroWriter(lazyPosDeleteSparkType())) .overwrite() .rowSchema(posDeleteRowSchema) @@ -296,7 +302,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .overwrite() .rowSchema(posDeleteRowSchema)