Skip to content

Commit

Permalink
6762 for main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
ggershinsky committed Dec 21, 2023
1 parent 2eea697 commit 091bc2a
Show file tree
Hide file tree
Showing 21 changed files with 269 additions and 66 deletions.
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
Expand Down Expand Up @@ -91,6 +92,13 @@ public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
}

public static WriteBuilder write(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Currenty, encryption of data files in Avro format is not supported");
return new WriteBuilder(file.encryptingOutputFile());
}

public static class WriteBuilder {
private final OutputFile file;
private final Map<String, String> config = Maps.newHashMap();
Expand Down Expand Up @@ -272,6 +280,13 @@ public static DataWriteBuilder writeData(OutputFile file) {
return new DataWriteBuilder(file);
}

public static DataWriteBuilder writeData(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Currenty, encryption of data files in Avro format is not supported");
return new DataWriteBuilder(file.encryptingOutputFile());
}

public static class DataWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
Expand Down Expand Up @@ -368,6 +383,13 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
return new DeleteWriteBuilder(file);
}

public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Currenty, encryption of delete files in Avro format is not supported");
return new DeleteWriteBuilder(file.encryptingOutputFile());
}

public static class DeleteWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@
*/
package org.apache.iceberg.encryption;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;

public class EncryptionUtil {

private EncryptionUtil() {}

public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) {
return new StandardKeyMetadata(key.array(), aadPrefix.array());
}

public static KeyManagementClient createKmsClient(Map<String, String> catalogProperties) {
String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE);
String kmsImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_IMPL);
Expand Down Expand Up @@ -86,10 +92,7 @@ public static EncryptionManager createEncryptionManager(
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);

if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) {
throw new UnsupportedOperationException(
"Iceberg encryption currently supports only parquet format for data files");
}
boolean nativeDataEncryption = (FileFormat.fromString(fileFormat) == FileFormat.PARQUET);

int dataKeyLength =
PropertyUtil.propertyAsInt(
Expand All @@ -102,6 +105,11 @@ public static EncryptionManager createEncryptionManager(
"Invalid data key length: %s (must be 16, 24, or 32)",
dataKeyLength);

return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient);
return new StandardEncryptionManager(
tableKeyId, dataKeyLength, kmsClient, nativeDataEncryption);
}

public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) {
return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class StandardEncryptionManager implements EncryptionManager {
private final transient KeyManagementClient kmsClient;
private final String tableKeyId;
private final int dataKeyLength;
private final boolean nativeDataEncryption;

private transient volatile SecureRandom lazyRNG = null;

Expand All @@ -41,7 +42,10 @@ public class StandardEncryptionManager implements EncryptionManager {
* @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption
*/
public StandardEncryptionManager(
String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
String tableKeyId,
int dataKeyLength,
KeyManagementClient kmsClient,
boolean nativeDataEncryption) {
Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null");
Preconditions.checkArgument(
dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32,
Expand All @@ -51,6 +55,7 @@ public StandardEncryptionManager(
this.tableKeyId = tableKeyId;
this.kmsClient = kmsClient;
this.dataKeyLength = dataKeyLength;
this.nativeDataEncryption = nativeDataEncryption;
}

@Override
Expand All @@ -67,7 +72,15 @@ public InputFile decrypt(EncryptedInputFile encrypted) {
@Override
public Iterable<InputFile> decrypt(Iterable<EncryptedInputFile> encrypted) {
// Bulk decrypt is only applied to data files. Returning source input files for parquet.
return Iterables.transform(encrypted, this::decrypt);
if (nativeDataEncryption) {
return Iterables.transform(encrypted, this::getSourceFile);
} else {
return Iterables.transform(encrypted, this::decrypt);
}
}

private InputFile getSourceFile(EncryptedInputFile encryptedFile) {
return encryptedFile.encryptedInputFile();
}

private SecureRandom workerRNG() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
public class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
private static final byte V1 = 1;
private static final Schema SCHEMA_V1 =
new Schema(
Expand Down Expand Up @@ -73,11 +73,11 @@ static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() {
return avroSchemaVersions;
}

ByteBuffer encryptionKey() {
public ByteBuffer encryptionKey() {
return encryptionKey;
}

ByteBuffer aadPrefix() {
public ByteBuffer aadPrefix() {
return aadPrefix;
}

Expand All @@ -95,7 +95,7 @@ static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) {
return parse(kmBuffer);
}

static StandardKeyMetadata parse(ByteBuffer buffer) {
public static StandardKeyMetadata parse(ByteBuffer buffer) {
try {
return KEY_METADATA_DECODER.decode(buffer);
} catch (IOException e) {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public interface FileAppenderFactory<T> {
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

/**
* Create a new {@link FileAppender}.
*
* @param outputFile an EncryptedOutputFile used to create an output stream.
* @param fileFormat File format.
* @return a newly created {@link FileAppender}
*/
default FileAppender<T> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
return newAppender(outputFile.encryptingOutputFile(), fileFormat);
}

/**
* Create a new {@link DataWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

Expand Down Expand Up @@ -93,7 +92,6 @@ protected BaseFileWriterFactory(
@Override
public DataWriter<T> newDataWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -102,7 +100,7 @@ public DataWriter<T> newDataWriter(
switch (dataFileFormat) {
case AVRO:
Avro.DataWriteBuilder avroBuilder =
Avro.writeData(outputFile)
Avro.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -118,7 +116,7 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -134,7 +132,7 @@ public DataWriter<T> newDataWriter(

case ORC:
ORC.DataWriteBuilder orcBuilder =
ORC.writeData(outputFile)
ORC.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -160,7 +158,6 @@ public DataWriter<T> newDataWriter(
@Override
public EqualityDeleteWriter<T> newEqualityDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -169,7 +166,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(outputFile)
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -186,7 +183,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -203,7 +200,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(outputFile)
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -230,7 +227,6 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
@Override
public PositionDeleteWriter<T> newPositionDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
Expand All @@ -239,7 +235,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(outputFile)
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand All @@ -254,7 +250,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand All @@ -269,7 +265,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(outputFile)
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand Down
7 changes: 7 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.StandardKeyMetadata;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -281,6 +282,12 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

if (deleteFile.keyMetadata() != null) {
StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(deleteFile.keyMetadata());
builder.withFileEncryptionKey(keyMetadata.encryptionKey());
builder.withAADPrefix(keyMetadata.aadPrefix());
}

return builder.build();

case ORC:
Expand Down
Loading

0 comments on commit 091bc2a

Please sign in to comment.