Skip to content

Commit

Permalink
Core, Spark, Flink, Data: Deliver key metadata for encryption of data…
Browse files Browse the repository at this point in the history
… files (#9359)
  • Loading branch information
ggershinsky authored Jan 4, 2024
1 parent 1a9c3f7 commit 1288eb8
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,4 @@ public interface EncryptedOutputFile {
* #encryptingOutputFile()}.
*/
EncryptionKeyMetadata keyMetadata();

/** Underlying output file for native encryption. */
default OutputFile plainOutputFile() {
throw new UnsupportedOperationException("Not implemented");
}
}
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
Expand Down Expand Up @@ -92,6 +93,13 @@ public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
}

public static WriteBuilder write(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Avro encryption is not supported");
return new WriteBuilder(file.encryptingOutputFile());
}

public static class WriteBuilder {
private final OutputFile file;
private final Map<String, String> config = Maps.newHashMap();
Expand Down Expand Up @@ -273,6 +281,13 @@ public static DataWriteBuilder writeData(OutputFile file) {
return new DataWriteBuilder(file);
}

public static DataWriteBuilder writeData(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Avro encryption is not supported");
return new DataWriteBuilder(file.encryptingOutputFile());
}

public static class DataWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
Expand Down Expand Up @@ -369,6 +384,13 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
return new DeleteWriteBuilder(file);
}

public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Avro encryption is not supported");
return new DeleteWriteBuilder(file.encryptingOutputFile());
}

public static class DeleteWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;

Expand Down Expand Up @@ -80,17 +80,6 @@ public static EncryptionManager createEncryptionManager(
return PlaintextEncryptionManager.instance();
}

String fileFormat =
PropertyUtil.propertyAsString(
tableProperties,
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);

if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) {
throw new UnsupportedOperationException(
"Iceberg encryption currently supports only parquet format for data files");
}

int dataKeyLength =
PropertyUtil.propertyAsInt(
tableProperties,
Expand All @@ -104,4 +93,8 @@ public static EncryptionManager createEncryptionManager(

return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient);
}

public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) {
return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import org.apache.iceberg.io.InputFile;

/** An {@link EncryptedInputFile} that can be used for format-native encryption. */
public interface NativeEncryptionInputFile extends EncryptedInputFile, InputFile {
@Override
NativeEncryptionKeyMetadata keyMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import java.nio.ByteBuffer;

/** {@link EncryptionKeyMetadata} for use with format-native encryption. */
public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata {
/** Encryption key as a {@link ByteBuffer} */
ByteBuffer encryptionKey();

/** Additional authentication data as a {@link ByteBuffer} */
ByteBuffer aadPrefix();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import org.apache.iceberg.io.OutputFile;

/** An {@link EncryptedOutputFile} that can be used for format-native encryption. */
public interface NativeEncryptionOutputFile extends EncryptedOutputFile {
@Override
NativeEncryptionKeyMetadata keyMetadata();

/** An {@link OutputFile} instance for the underlying (plaintext) output stream. */
OutputFile plainOutputFile();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,18 @@ public StandardEncryptionManager(
}

@Override
public EncryptedOutputFile encrypt(OutputFile plainOutput) {
public NativeEncryptionOutputFile encrypt(OutputFile plainOutput) {
return new StandardEncryptedOutputFile(plainOutput, dataKeyLength);
}

@Override
public InputFile decrypt(EncryptedInputFile encrypted) {
public NativeEncryptionInputFile decrypt(EncryptedInputFile encrypted) {
// this input file will lazily parse key metadata in case the file is not an AES GCM stream.
return new StandardDecryptedInputFile(encrypted);
}

@Override
public Iterable<InputFile> decrypt(Iterable<EncryptedInputFile> encrypted) {
// Bulk decrypt is only applied to data files. Returning source input files for parquet.
return Iterables.transform(encrypted, this::decrypt);
}

Expand Down Expand Up @@ -96,7 +95,7 @@ public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) {
return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId);
}

private class StandardEncryptedOutputFile implements EncryptedOutputFile {
private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile {
private final OutputFile plainOutputFile;
private final int dataKeyLength;
private StandardKeyMetadata lazyKeyMetadata = null;
Expand Down Expand Up @@ -141,7 +140,7 @@ public OutputFile plainOutputFile() {
}
}

private static class StandardDecryptedInputFile implements InputFile {
private static class StandardDecryptedInputFile implements NativeEncryptionInputFile {
private final EncryptedInputFile encryptedInputFile;
private StandardKeyMetadata lazyKeyMetadata = null;
private AesGcmInputFile lazyDecryptedInputFile = null;
Expand All @@ -150,7 +149,13 @@ private StandardDecryptedInputFile(EncryptedInputFile encryptedInputFile) {
this.encryptedInputFile = encryptedInputFile;
}

private StandardKeyMetadata keyMetadata() {
@Override
public InputFile encryptedInputFile() {
return encryptedInputFile.encryptedInputFile();
}

@Override
public StandardKeyMetadata keyMetadata() {
if (null == lazyKeyMetadata) {
this.lazyKeyMetadata = StandardKeyMetadata.castOrParse(encryptedInputFile.keyMetadata());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord {
private static final byte V1 = 1;
private static final Schema SCHEMA_V1 =
new Schema(
Expand Down Expand Up @@ -73,11 +73,13 @@ static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() {
return avroSchemaVersions;
}

ByteBuffer encryptionKey() {
@Override
public ByteBuffer encryptionKey() {
return encryptionKey;
}

ByteBuffer aadPrefix() {
@Override
public ByteBuffer aadPrefix() {
return aadPrefix;
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public interface FileAppenderFactory<T> {
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

/**
* Create a new {@link FileAppender}.
*
* @param outputFile an EncryptedOutputFile used to create an output stream.
* @param fileFormat File format.
* @return a newly created {@link FileAppender}
*/
default FileAppender<T> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
return newAppender(outputFile.encryptingOutputFile(), fileFormat);
}

/**
* Create a new {@link DataWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

Expand Down Expand Up @@ -93,7 +92,6 @@ protected BaseFileWriterFactory(
@Override
public DataWriter<T> newDataWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -102,7 +100,7 @@ public DataWriter<T> newDataWriter(
switch (dataFileFormat) {
case AVRO:
Avro.DataWriteBuilder avroBuilder =
Avro.writeData(outputFile)
Avro.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -118,7 +116,7 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -134,7 +132,7 @@ public DataWriter<T> newDataWriter(

case ORC:
ORC.DataWriteBuilder orcBuilder =
ORC.writeData(outputFile)
ORC.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -160,7 +158,6 @@ public DataWriter<T> newDataWriter(
@Override
public EqualityDeleteWriter<T> newEqualityDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -169,7 +166,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(outputFile)
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -186,7 +183,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -203,7 +200,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(outputFile)
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -230,7 +227,6 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
@Override
public PositionDeleteWriter<T> newPositionDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
Expand All @@ -239,7 +235,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(outputFile)
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand All @@ -254,7 +250,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand All @@ -269,7 +265,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(outputFile)
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand Down
Loading

0 comments on commit 1288eb8

Please sign in to comment.