Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
use key and aadPrefix explicitly

util

post-review changes

package-private KeyMetadata
  • Loading branch information
ggershinsky committed Jun 11, 2023
1 parent 32a8ef5 commit 2e0c52f
Show file tree
Hide file tree
Showing 19 changed files with 226 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public interface EncryptedOutputFile {
* #encryptingOutputFile()}.
*/
EncryptionKeyMetadata keyMetadata();

/** Underlying output file for native encryption. */
default OutputFile rawOutputFile() {
return null;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ static EncryptionKeyMetadata empty() {
ByteBuffer buffer();

EncryptionKeyMetadata copy();

default ByteBuffer encryptionKey() {
return null;
}

default ByteBuffer aadPrefix() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,9 @@ public static EncryptedOutputFile encryptedOutput(
encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata));
}

public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) {
return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty());
}

private EncryptedFiles() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import java.nio.ByteBuffer;
import org.apache.iceberg.common.DynConstructors;

public class EncryptionUtil {

private EncryptionUtil() {}

public static EncryptionKeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) {
return KeyMetadata.parse(metadataBuffer);
}

public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) {
return new KeyMetadata(key, aadPrefix);
}

static KeyManagementClient createKmsClient(String kmsImpl) {
DynConstructors.Ctor<KeyManagementClient> ctor;
try {
ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsImpl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize KeyManagementClient, missing no-arg constructor for class %s",
kmsImpl),
e);
}

try {
return ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize kms client, %s does not implement KeyManagementClient interface",
kmsImpl),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
*/
package org.apache.iceberg.encryption;

import java.nio.ByteBuffer;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlaintextEncryptionManager implements EncryptionManager {
private static final EncryptionManager INSTANCE = new PlaintextEncryptionManager();
private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class);

public static EncryptionManager instance() {
return INSTANCE;
}

@Override
public InputFile decrypt(EncryptedInputFile encrypted) {
if (encrypted.keyMetadata().buffer() != null) {
if (encrypted.keyMetadata() != null && encrypted.keyMetadata().buffer() != null) {
LOG.warn(
"File encryption key metadata is present, but currently using PlaintextEncryptionManager.");
}
Expand All @@ -38,6 +42,6 @@ public InputFile decrypt(EncryptedInputFile encrypted) {

@Override
public EncryptedOutputFile encrypt(OutputFile rawOutput) {
return EncryptedFiles.encryptedOutput(rawOutput, (ByteBuffer) null);
return EncryptedFiles.encryptedOutput(rawOutput, EncryptionKeyMetadata.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public interface FileAppenderFactory<T> {
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

default FileAppender<T> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
return newAppender(outputFile.encryptingOutputFile(), fileFormat);
}

/**
* Create a new {@link DataWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ protected BaseFileWriterFactory(
@Override
public DataWriter<T> newDataWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -102,7 +101,7 @@ public DataWriter<T> newDataWriter(
switch (dataFileFormat) {
case AVRO:
Avro.DataWriteBuilder avroBuilder =
Avro.writeData(outputFile)
Avro.writeData(file.encryptingOutputFile())
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -118,13 +117,15 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file.rawOutputFile())
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withFileEncryptionKey(keyMetadata.encryptionKey())
.withAADPrefix(keyMetadata.aadPrefix())
.withSortOrder(dataSortOrder)
.overwrite();

Expand All @@ -134,7 +135,7 @@ public DataWriter<T> newDataWriter(

case ORC:
ORC.DataWriteBuilder orcBuilder =
ORC.writeData(outputFile)
ORC.writeData(file.encryptingOutputFile())
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand Down Expand Up @@ -194,6 +195,8 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withFileEncryptionKey(keyMetadata.encryptionKey())
.withAADPrefix(keyMetadata.aadPrefix())
.withSortOrder(equalityDeleteSortOrder)
.overwrite();

Expand Down Expand Up @@ -261,6 +264,8 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withFileEncryptionKey(keyMetadata.encryptionKey())
.withAADPrefix(keyMetadata.aadPrefix())
.overwrite();

configurePositionDelete(parquetBuilder);
Expand Down
9 changes: 9 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -293,6 +295,13 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

if (deleteFile.keyMetadata() != null) {
EncryptionKeyMetadata keyMetadata =
EncryptionUtil.parseKeyMetadata(deleteFile.keyMetadata());
builder.withFileEncryptionKey(keyMetadata.encryptionKey());
builder.withAADPrefix(keyMetadata.aadPrefix());
}

return builder.build();

case ORC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -84,11 +86,17 @@ public GenericAppenderFactory setAll(Map<String, String> properties) {

@Override
public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFormat) {
return newAppender(
EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.empty()), fileFormat);
}

@Override
public FileAppender<Record> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
try {
switch (fileFormat) {
case AVRO:
return Avro.write(outputFile)
return Avro.write(outputFile.encryptingOutputFile())
.schema(schema)
.createWriterFunc(DataWriter::create)
.metricsConfig(metricsConfig)
Expand All @@ -97,16 +105,18 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
.build();

case PARQUET:
return Parquet.write(outputFile)
return Parquet.write(outputFile.rawOutputFile())
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.setAll(config)
.metricsConfig(metricsConfig)
.overwrite()
.withFileEncryptionKey(outputFile.keyMetadata().encryptionKey())
.withAADPrefix(outputFile.keyMetadata().aadPrefix())
.build();

case ORC:
return ORC.write(outputFile)
return ORC.write(outputFile.encryptingOutputFile())
.schema(schema)
.createWriterFunc(GenericOrcWriter::buildWriter)
.setAll(config)
Expand All @@ -127,7 +137,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
public org.apache.iceberg.io.DataWriter<Record> newDataWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new org.apache.iceberg.io.DataWriter<>(
newAppender(file.encryptingOutputFile(), format),
newAppender(file, format),
format,
file.encryptingOutputFile().location(),
spec,
Expand Down Expand Up @@ -183,6 +193,8 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.withFileEncryptionKey(file.keyMetadata().encryptionKey())
.withAADPrefix(file.keyMetadata().aadPrefix())
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

Expand Down Expand Up @@ -233,6 +245,8 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.withFileEncryptionKey(file.keyMetadata().encryptionKey())
.withAADPrefix(file.keyMetadata().aadPrefix())
.buildPositionWriter();

default:
Expand Down
9 changes: 9 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -125,6 +127,13 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
parquet.reuseContainers();
}

if (task.file().keyMetadata() != null) {
EncryptionKeyMetadata keyMetadata =
EncryptionUtil.parseKeyMetadata(task.file().keyMetadata());
parquet.withFileEncryptionKey(keyMetadata.encryptionKey());
parquet.withAADPrefix(keyMetadata.aadPrefix());
}

return parquet.build();

case ORC:
Expand Down
Loading

0 comments on commit 2e0c52f

Please sign in to comment.