Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
use key and aadPrefix explicitly

util

post-review changes

package-private KeyMetadata

fix key metadata method signatures

fix NPE

update PositionDeletesRowReader

use ALL_CAPS

move spark 3.3 to 3.4, flink 1.16 to 1.17

update spark source BaseReader

address review comments

clean up

update revapi

revert visibility limit

revert TableOperations

move plaintext manager changes to another pr

address review comments

revert BaseEncryptedOutputFile
  • Loading branch information
ggershinsky committed Dec 21, 2023
1 parent c1e877a commit e20cda6
Show file tree
Hide file tree
Showing 21 changed files with 280 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public interface EncryptedOutputFile {
* #encryptingOutputFile()}.
*/
EncryptionKeyMetadata keyMetadata();

/** Underlying output file for native encryption. */
default OutputFile rawOutputFile() {
throw new UnsupportedOperationException("Not implemented");
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public ByteBuffer buffer() {
public EncryptionKeyMetadata copy() {
return this;
}

@Override
public ByteBuffer encryptionKey() {
return null;
}
};

static EncryptionKeyMetadata empty() {
Expand All @@ -49,4 +54,12 @@ static EncryptionKeyMetadata empty() {
ByteBuffer buffer();

EncryptionKeyMetadata copy();

default ByteBuffer encryptionKey() {
throw new UnsupportedOperationException("Not implemented");
}

default ByteBuffer aadPrefix() {
throw new UnsupportedOperationException("Not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,9 @@ public static EncryptedOutputFile encryptedOutput(
encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata));
}

public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) {
return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty());
}

private EncryptedFiles() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import java.nio.ByteBuffer;
import org.apache.iceberg.common.DynConstructors;

public class EncryptionUtil {

private EncryptionUtil() {}

public static KeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) {
return KeyMetadata.parse(metadataBuffer);
}

public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) {
return new KeyMetadata(key, aadPrefix);
}

static KeyManagementClient createKmsClient(String kmsImpl) {
DynConstructors.Ctor<KeyManagementClient> ctor;
try {
ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsImpl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize KeyManagementClient, missing no-arg constructor for class %s",
kmsImpl),
e);
}

try {
return ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize kms client, %s does not implement KeyManagementClient interface",
kmsImpl),
e);
}
}

public static boolean useNativeEncryption(EncryptionKeyMetadata keyMetadata) {
return keyMetadata != null && keyMetadata instanceof KeyMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
private static final org.apache.avro.Schema AVRO_SCHEMA_V1 =
AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName());

private static final Map<Byte, Schema> schemaVersions = ImmutableMap.of(V1, SCHEMA_V1);
private static final Map<Byte, org.apache.avro.Schema> avroSchemaVersions =
private static final Map<Byte, Schema> SCHEMA_VERSIONS = ImmutableMap.of(V1, SCHEMA_V1);
private static final Map<Byte, org.apache.avro.Schema> AVRO_SCHEMA_VERSIONS =
ImmutableMap.of(V1, AVRO_SCHEMA_V1);

private static final KeyMetadataEncoder KEY_METADATA_ENCODER = new KeyMetadataEncoder(V1);
Expand All @@ -61,18 +61,20 @@ class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
}

static Map<Byte, Schema> supportedSchemaVersions() {
return schemaVersions;
return SCHEMA_VERSIONS;
}

static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() {
return avroSchemaVersions;
return AVRO_SCHEMA_VERSIONS;
}

ByteBuffer encryptionKey() {
@Override
public ByteBuffer encryptionKey() {
return encryptionKey;
}

ByteBuffer aadPrefix() {
@Override
public ByteBuffer aadPrefix() {
return aadPrefix;
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public interface FileAppenderFactory<T> {
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

/**
* Create a new {@link FileAppender}.
*
* @param outputFile an EncryptedOutputFile used to create an output stream.
* @param fileFormat File format.
* @return a newly created {@link FileAppender}
*/
default FileAppender<T> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
return newAppender(outputFile.encryptingOutputFile(), fileFormat);
}

/**
* Create a new {@link DataWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand Down Expand Up @@ -186,7 +186,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand Down Expand Up @@ -254,7 +254,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand Down
9 changes: 9 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -293,6 +295,13 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

if (deleteFile.keyMetadata() != null) {
EncryptionKeyMetadata keyMetadata =
EncryptionUtil.parseKeyMetadata(deleteFile.keyMetadata());
builder.withFileEncryptionKey(keyMetadata.encryptionKey());
builder.withAADPrefix(keyMetadata.aadPrefix());
}

return builder.build();

case ORC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
Expand Down Expand Up @@ -84,7 +85,14 @@ public GenericAppenderFactory setAll(Map<String, String> properties) {

@Override
public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFormat) {
return newAppender(EncryptedFiles.plainAsEncryptedOutput(outputFile), fileFormat);
}

@Override
public FileAppender<Record> newAppender(
EncryptedOutputFile encryptedOutputFile, FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
OutputFile outputFile = encryptedOutputFile.encryptingOutputFile();
try {
switch (fileFormat) {
case AVRO:
Expand All @@ -97,7 +105,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
.build();

case PARQUET:
return Parquet.write(outputFile)
return Parquet.write(encryptedOutputFile)
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.setAll(config)
Expand Down Expand Up @@ -127,7 +135,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
public org.apache.iceberg.io.DataWriter<Record> newDataWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new org.apache.iceberg.io.DataWriter<>(
newAppender(file.encryptingOutputFile(), format),
newAppender(file, format),
format,
file.encryptingOutputFile().location(),
spec,
Expand All @@ -146,10 +154,12 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
OutputFile outputFile = file.encryptingOutputFile();

try {
switch (format) {
case AVRO:
return Avro.writeDeletes(file.encryptingOutputFile())
return Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.overwrite()
Expand All @@ -161,7 +171,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
return ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.overwrite()
Expand All @@ -174,7 +184,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
.buildEqualityWriter();

case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
return Parquet.writeDeletes(file)
.createWriterFunc(GenericParquetWriter::buildWriter)
.withPartition(partition)
.overwrite()
Expand All @@ -199,10 +209,12 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
public PositionDeleteWriter<Record> newPosDeleteWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
OutputFile outputFile = file.encryptingOutputFile();

try {
switch (format) {
case AVRO:
return Avro.writeDeletes(file.encryptingOutputFile())
return Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.overwrite()
Expand All @@ -213,7 +225,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
.buildPositionWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
return ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.overwrite()
Expand All @@ -224,7 +236,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
.buildPositionWriter();

case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
return Parquet.writeDeletes(file)
.createWriterFunc(GenericParquetWriter::buildWriter)
.withPartition(partition)
.overwrite()
Expand Down
9 changes: 9 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -126,6 +128,13 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
parquet.reuseContainers();
}

if (task.file().keyMetadata() != null) {
EncryptionKeyMetadata keyMetadata =
EncryptionUtil.parseKeyMetadata(task.file().keyMetadata());
parquet.withFileEncryptionKey(keyMetadata.encryptionKey());
parquet.withAADPrefix(keyMetadata.aadPrefix());
}

return parquet.build();

case ORC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
Expand Down Expand Up @@ -99,7 +100,15 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
return newAppender(EncryptedFiles.plainAsEncryptedOutput(outputFile), format);
}

@Override
public FileAppender<RowData> newAppender(
EncryptedOutputFile encryptedOutputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
OutputFile outputFile = encryptedOutputFile.encryptingOutputFile();

try {
switch (format) {
case AVRO:
Expand All @@ -122,7 +131,7 @@ public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat forma
.build();

case PARQUET:
return Parquet.write(outputFile)
return Parquet.write(encryptedOutputFile)
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType))
.setAll(props)
.metricsConfig(metricsConfig)
Expand All @@ -142,7 +151,7 @@ public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat forma
public DataWriter<RowData> newDataWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
newAppender(file.encryptingOutputFile(), format),
newAppender(file, format),
format,
file.encryptingOutputFile().location(),
spec,
Expand Down Expand Up @@ -191,7 +200,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
.buildEqualityWriter();

case PARQUET:
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
return Parquet.writeDeletes(outputFile)
.createWriterFunc(
msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType))
.withPartition(partition)
Expand Down Expand Up @@ -250,7 +259,7 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(
case PARQUET:
RowType flinkPosDeleteSchema =
FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
return Parquet.writeDeletes(outputFile)
.createWriterFunc(
msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType))
.withPartition(partition)
Expand Down
Loading

0 comments on commit e20cda6

Please sign in to comment.