Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deliver key metadata to parquet encryption #6762

Closed
wants to merge 2 commits into from

Conversation

ggershinsky
Copy link
Contributor

No description provided.

@ggershinsky ggershinsky changed the title MR: Deliver key metadata to parquet readers Deliver key metadata to parquet readers Feb 7, 2023
@github-actions github-actions bot added the core label Feb 20, 2023
@@ -52,6 +55,7 @@

protected CloseableIterable<ColumnarBatch> newBatchIterable(
InputFile inputFile,
ByteBuffer keyMetadata,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an exception if keyMetadata is non-null and the format is ORC?

public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) {
public FileAppender<InternalRow> newAppender(OutputFile outputFile, FileFormat format) {
return newAppender(
EncryptedFiles.encryptedOutput(outputFile, (EncryptionKeyMetadata) null), format);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a pass-through factory method to EncryptedFiles rather than constructing one here? I think it would be better to call EncryptedFiles.plainAsEncryptedOutput rather than passing null key metadata.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks reasonable.

@rdblue
Copy link
Contributor

rdblue commented May 22, 2023

@ggershinsky, I think this is close but tests are failing. Can you update it?

@github-actions github-actions bot added the API label Jun 8, 2023
@ggershinsky ggershinsky force-pushed the deliver-key-metadata branch 2 times, most recently from 2e0c52f to 2bc7789 Compare June 11, 2023 04:49
@ggershinsky ggershinsky force-pushed the deliver-key-metadata branch from 14c1eab to 83e239b Compare July 24, 2023 05:16
@ggershinsky ggershinsky changed the title Deliver key metadata to parquet readers Deliver key metadata to parquet encryption Jul 31, 2023
BaseEncryptedOutputFile(
OutputFile encryptingOutputFile,
EncryptionKeyMetadata keyMetadata,
OutputFile rawOutputFile) {
Copy link
Contributor

@rdblue rdblue Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two output files looks really suspicious to me. Maybe we would want to use a different implementation of the API to avoid that. Shouldn't this create the encrypting output file rather than having it passed in? Or is that an artifact from how the encryption manager works?

The encryption manager we use doesn't have to produce these files.

Copy link
Contributor Author

@ggershinsky ggershinsky Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is that an artifact from how the encryption manager works?

Correct. This is a result of our previous discussion on encryption manager and Parquet -
#6884 (comment)
"For native Parquet encryption, I think the EncryptedOutputFile and EncryptedInputFile classes would need to be able to return the underlying stream as well, so that encryption can be handled by Parquet."

Also, this class (BaseEncryptedOutputFile) exists for a while now, and was designed to be a simple wrapper of an encrypting stream, its key metadata, and now its underlying stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering this when looking at #6884. EncryptedOutputFile definitely needs to be able to return the underlying stream for Parquet encryption, but that doesn't mean that BaseEncryptedInputFile necessarily has to be used here.

Overall, I'm fine with this but it does look odd to have a wrapper that can supply either the raw output file or an encrypted one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted the BaseEncryptedOutputFile class. There is a new implementation of the updated EncryptedOutputFile, residing inside the encryption manager that provides the native Parquet encryption.

@@ -49,4 +49,12 @@ static EncryptionKeyMetadata empty() {
ByteBuffer buffer();

EncryptionKeyMetadata copy();

default ByteBuffer encryptionKey() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why null instead of UnsupportedOperationException? Won't the caller need to throw an exception?


private EncryptionUtil() {}

public static EncryptionKeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this return KeyMetadata?

return new KeyMetadata(key, aadPrefix);
}

static KeyManagementClient createKmsClient(String kmsImpl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above, I'd prefer to primarily use a kms-type and fall back to an impl class if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. This method will be called only if kms-type is custom

@Override
public InputFile decrypt(EncryptedInputFile encrypted) {
if (encrypted.keyMetadata().buffer() != null) {
if (encrypted.keyMetadata() != null && encrypted.keyMetadata().buffer() != null) {
LOG.warn(
"File encryption key metadata is present, but currently using PlaintextEncryptionManager.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using a class name, can we change this to "but no encryption has been configured"

@@ -118,13 +117,15 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file.rawOutputFile())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change. I think we need to check whether keyMetadata is a KeyMetadata in order to do this. If it's metadata that can be used for Parquet native encryption then we can use it. Otherwise we should fall back to using encryptingOutputFile() like before.

@@ -194,6 +195,8 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withFileEncryptionKey(keyMetadata.encryptionKey())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this also need to use the raw output file?

@@ -261,6 +264,8 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withFileEncryptionKey(keyMetadata.encryptionKey())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Why doesn't this use the raw output file?

Also, should we have a Parquet.writeDeletes(EncryptingOutputFile)? It would work like this:

  public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
    if (file.keyMetadata() instanceof KeyMetadata) {
      KeyMetadata standardKeyMetadata = (KeyMetadata) file.keyMetadata();
      return writeDeletes(file.rawOutputFile())
          .keyMetadata(standardKeyMetadata.encryptionKey())
          .withAADPrefix(standardKeyMetadata.aadPrefix());
    } else {
      return writeDeletes(file.encryptingOutputFile());
    }
  }

@@ -293,6 +295,13 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

if (deleteFile.keyMetadata() != null) {
EncryptionKeyMetadata keyMetadata =
EncryptionUtil.parseKeyMetadata(deleteFile.keyMetadata());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cast or parse would be better here, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyMetadata() is always a ByteBuffer here, so it has to be parsed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. That makes sense.

@@ -61,9 +66,12 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable(
SparkDeleteFilter deleteFilter) {
switch (format) {
case PARQUET:
return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
return newParquetIterable(
inputFile, keyMetadata, start, length, residual, idToConstant, deleteFilter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this pass the EncryptedInputFile instead? That would avoid needing to pass both separately.

// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator);
Stream<InputFile> inputFiles =
taskGroup.tasks().stream().flatMap(this::referencedFiles).map(this::toInputFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not change because it breaks usage of the bulk decrypt API. That doesn't matter for Iceberg standard encryption, but it would negatively affect people using their own EncryptionManager.

There's also no need to change how this works. It's fine for the decrypt method to return BaseEncryptedInputFile.encryptedInputFile. That doesn't actually create a decryption stream unless the InputFile is used.

Instead, we can either keep the EncryptedInputFile instances around or we can use a class that handles both APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revert the changes in this class, and move the native decryption logic to the StandardEncryptionManager class.

@@ -58,7 +62,8 @@ protected CloseableIterable<InternalRow> newIterable(
Map<Integer, ?> idToConstant) {
switch (format) {
case PARQUET:
return newParquetIterable(file, start, length, residual, projection, idToConstant);
return newParquetIterable(
file, encryptionKeyMetadata, start, length, residual, projection, idToConstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer passing EncryptedInputFile here as well.

@ggershinsky
Copy link
Contributor Author

@rdblue Thanks for the review. This PR and the related #6884 , #5544 are updated to address the comments.


/** Underlying output file for native encryption. */
default OutputFile rawOutputFile() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw UnsupportedOperationException? That seems like a good idea to me so that we don't get NPE when trying to use this with Parquet.

}

default ByteBuffer aadPrefix() {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we handle a null aadPrefix or do we assume it is non-null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null is technically possible. It'd be indeed safer to throw an unsupported exception here as well.

@@ -126,6 +128,13 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
parquet.reuseContainers();
}

if (task.file().keyMetadata() != null) {
EncryptionKeyMetadata keyMetadata =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method in EncryptionUtil returns a package-private implementation, KeyMetadata. That leaks the class outside of the package but it isn't useful. I think that's why this added methods to EncryptionKeyMetadata. I think it's better to make KeyMetadata (or StandardKeyMetadata as I renamed it in my last PR) public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@@ -118,7 +118,7 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file)
Copy link
Contributor

@rdblue rdblue Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since encryptingOutputFile will create an AesGcmOutputFile, I don't think it should be called unless it is going to be used. I think outputFile should be removed and the branches that pass an OutputFile (ORC and Avro) should call file.encryptingOutputFile() inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@@ -146,10 +154,12 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
OutputFile outputFile = file.encryptingOutputFile();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I think this should be called inline like it was before.

public static WriteBuilder write(EncryptedOutputFile file) {
if (EncryptionUtil.useNativeEncryption(file.keyMetadata())) {
return write(file.rawOutputFile())
.withFileEncryptionKey(file.keyMetadata().encryptionKey())
Copy link
Contributor

@rdblue rdblue Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I suggested adding encryptionKey and aadPrefix to the base, but after looking at the castOrParse, I think it would be cleaner to use that here to get an instance of StandardKeyMetadata.

It may also be simpler if this used a direct instanceof check here. Then you could just cast directly. You can't do that with a method that may change doing the type check.

if (file.keyMetadata() instanceof StandardKeyMetadata) {
  StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata();
  return write(file.plainOutputFile())
      .withFileEncryptionKey(keyMetadata.encryptionKey())
      .withAADPrefix(keyMetadata.aadPrefix())
} else {
  return write(file.encryptingOutputFile());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also needs to call withKeyMetadata so that the key metadata instance is set.

After that, it's awkward that the key metadata is set separately than the encryption key and AAD prefix in this case because the key metadata is not opaque. I think the solution is to do the instanceof check above in withKeyMetadata. That way if the key metadata is StandardKeyMetadata then it will also configure the AAD prefix and key. If not, then setting them separately makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the first suggestion.

As for the second - this WriterBuilder class doesn't have a withKeyMetadata method (since it is a basic writer appender).

}

public static boolean useNativeEncryption(EncryptionKeyMetadata keyMetadata) {
return keyMetadata != null && keyMetadata instanceof KeyMetadata;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null check is redundant and can be removed.

@@ -175,15 +181,15 @@ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFor
.build();

case AVRO:
return Avro.write(file)
return Avro.write(file.encryptingOutputFile())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ggershinsky, it seems to me that with the AES GCM streams set up, Avro encryption would also work, right? In fact, although the StandardEncryptionManager is not used unless the format is Parquet, I think you can still request a format in individual writes. Those would work and use AES GCM stream encryption.

I think we need to change how we prevent Avro and ORC encryption. Instead of doing the check when creating the encryption manager, it should be done here instead. What I would do is add Avro.write(EncryptingOutputStream) and ORC.write(EncryptingOutputStream) and have them throw UnsupportedOperationException unless the key metadata is null. That will prevent AES GCM from being used until we want to add the feature.

We should also consider whether this will just work for Avro files!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

use key and aadPrefix explicitly

util

post-review changes

package-private KeyMetadata

fix key metadata method signatures

fix NPE

update PositionDeletesRowReader

use ALL_CAPS

move spark 3.3 to 3.4, flink 1.16 to 1.17

update spark source BaseReader

address review comments

clean up

update revapi

revert visibility limit

revert TableOperations

move plaintext manager changes to another pr

address review comments

revert BaseEncryptedOutputFile
@ggershinsky
Copy link
Contributor Author

Moved to main branch base via #9359

@ggershinsky ggershinsky deleted the deliver-key-metadata branch January 8, 2024 05:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

2 participants