-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deliver key metadata to parquet encryption #6762
Conversation
core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
Outdated
Show resolved
Hide resolved
mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Show resolved
Hide resolved
@@ -52,6 +55,7 @@ | |||
|
|||
protected CloseableIterable<ColumnarBatch> newBatchIterable( | |||
InputFile inputFile, | |||
ByteBuffer keyMetadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw an exception if keyMetadata
is non-null and the format is ORC?
core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
Outdated
Show resolved
Hide resolved
public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) { | ||
public FileAppender<InternalRow> newAppender(OutputFile outputFile, FileFormat format) { | ||
return newAppender( | ||
EncryptedFiles.encryptedOutput(outputFile, (EncryptionKeyMetadata) null), format); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a pass-through factory method to EncryptedFiles
rather than constructing one here? I think it would be better to call EncryptedFiles.plainAsEncryptedOutput
rather than passing null key metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks reasonable.
@ggershinsky, I think this is close but tests are failing. Can you update it? |
2e0c52f
to
2bc7789
Compare
14c1eab
to
83e239b
Compare
BaseEncryptedOutputFile( | ||
OutputFile encryptingOutputFile, | ||
EncryptionKeyMetadata keyMetadata, | ||
OutputFile rawOutputFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two output files looks really suspicious to me. Maybe we would want to use a different implementation of the API to avoid that. Shouldn't this create the encrypting output file rather than having it passed in? Or is that an artifact from how the encryption manager works?
The encryption manager we use doesn't have to produce these files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is that an artifact from how the encryption manager works?
Correct. This is a result of our previous discussion on encryption manager and Parquet -
#6884 (comment)
"For native Parquet encryption, I think the EncryptedOutputFile and EncryptedInputFile classes would need to be able to return the underlying stream as well, so that encryption can be handled by Parquet."
Also, this class (BaseEncryptedOutputFile
) exists for a while now, and was designed to be a simple wrapper of an encrypting stream, its key metadata, and now its underlying stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was considering this when looking at #6884. EncryptedOutputFile
definitely needs to be able to return the underlying stream for Parquet encryption, but that doesn't mean that BaseEncryptedInputFile
necessarily has to be used here.
Overall, I'm fine with this but it does look odd to have a wrapper that can supply either the raw output file or an encrypted one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted the BaseEncryptedOutputFile
class. There is a new implementation of the updated EncryptedOutputFile
, residing inside the encryption manager that provides the native Parquet encryption.
@@ -49,4 +49,12 @@ static EncryptionKeyMetadata empty() { | |||
ByteBuffer buffer(); | |||
|
|||
EncryptionKeyMetadata copy(); | |||
|
|||
default ByteBuffer encryptionKey() { | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why null
instead of UnsupportedOperationException
? Won't the caller need to throw an exception?
|
||
private EncryptionUtil() {} | ||
|
||
public static EncryptionKeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this return KeyMetadata
?
return new KeyMetadata(key, aadPrefix); | ||
} | ||
|
||
static KeyManagementClient createKmsClient(String kmsImpl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned above, I'd prefer to primarily use a kms-type
and fall back to an impl class if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. This method will be called only if kms-type
is custom
@Override | ||
public InputFile decrypt(EncryptedInputFile encrypted) { | ||
if (encrypted.keyMetadata().buffer() != null) { | ||
if (encrypted.keyMetadata() != null && encrypted.keyMetadata().buffer() != null) { | ||
LOG.warn( | ||
"File encryption key metadata is present, but currently using PlaintextEncryptionManager."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using a class name, can we change this to "but no encryption has been configured"
@@ -118,13 +117,15 @@ public DataWriter<T> newDataWriter( | |||
|
|||
case PARQUET: | |||
Parquet.DataWriteBuilder parquetBuilder = | |||
Parquet.writeData(outputFile) | |||
Parquet.writeData(file.rawOutputFile()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a behavior change. I think we need to check whether keyMetadata
is a KeyMetadata
in order to do this. If it's metadata that can be used for Parquet native encryption then we can use it. Otherwise we should fall back to using encryptingOutputFile()
like before.
@@ -194,6 +195,8 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter( | |||
.withSpec(spec) | |||
.withPartition(partition) | |||
.withKeyMetadata(keyMetadata) | |||
.withFileEncryptionKey(keyMetadata.encryptionKey()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this also need to use the raw output file?
@@ -261,6 +264,8 @@ public PositionDeleteWriter<T> newPositionDeleteWriter( | |||
.withSpec(spec) | |||
.withPartition(partition) | |||
.withKeyMetadata(keyMetadata) | |||
.withFileEncryptionKey(keyMetadata.encryptionKey()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Why doesn't this use the raw output file?
Also, should we have a Parquet.writeDeletes(EncryptingOutputFile)
? It would work like this:
public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
if (file.keyMetadata() instanceof KeyMetadata) {
KeyMetadata standardKeyMetadata = (KeyMetadata) file.keyMetadata();
return writeDeletes(file.rawOutputFile())
.keyMetadata(standardKeyMetadata.encryptionKey())
.withAADPrefix(standardKeyMetadata.aadPrefix());
} else {
return writeDeletes(file.encryptingOutputFile());
}
}
@@ -293,6 +295,13 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele | |||
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); | |||
} | |||
|
|||
if (deleteFile.keyMetadata() != null) { | |||
EncryptionKeyMetadata keyMetadata = | |||
EncryptionUtil.parseKeyMetadata(deleteFile.keyMetadata()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cast or parse would be better here, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyMetadata() is always a ByteBuffer here, so it has to be parsed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying. That makes sense.
@@ -61,9 +66,12 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable( | |||
SparkDeleteFilter deleteFilter) { | |||
switch (format) { | |||
case PARQUET: | |||
return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); | |||
return newParquetIterable( | |||
inputFile, keyMetadata, start, length, residual, idToConstant, deleteFilter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this pass the EncryptedInputFile
instead? That would avoid needing to pass both separately.
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible | ||
Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator); | ||
Stream<InputFile> inputFiles = | ||
taskGroup.tasks().stream().flatMap(this::referencedFiles).map(this::toInputFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not change because it breaks usage of the bulk decrypt API. That doesn't matter for Iceberg standard encryption, but it would negatively affect people using their own EncryptionManager
.
There's also no need to change how this works. It's fine for the decrypt
method to return BaseEncryptedInputFile.encryptedInputFile
. That doesn't actually create a decryption stream unless the InputFile
is used.
Instead, we can either keep the EncryptedInputFile
instances around or we can use a class that handles both APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revert the changes in this class, and move the native decryption logic to the StandardEncryptionManager class.
@@ -58,7 +62,8 @@ protected CloseableIterable<InternalRow> newIterable( | |||
Map<Integer, ?> idToConstant) { | |||
switch (format) { | |||
case PARQUET: | |||
return newParquetIterable(file, start, length, residual, projection, idToConstant); | |||
return newParquetIterable( | |||
file, encryptionKeyMetadata, start, length, residual, projection, idToConstant); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer passing EncryptedInputFile
here as well.
23bb56b
to
fbbecba
Compare
|
||
/** Underlying output file for native encryption. */ | ||
default OutputFile rawOutputFile() { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw UnsupportedOperationException
? That seems like a good idea to me so that we don't get NPE when trying to use this with Parquet.
} | ||
|
||
default ByteBuffer aadPrefix() { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we handle a null aadPrefix
or do we assume it is non-null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null is technically possible. It'd be indeed safer to throw an unsupported exception here as well.
@@ -126,6 +128,13 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject | |||
parquet.reuseContainers(); | |||
} | |||
|
|||
if (task.file().keyMetadata() != null) { | |||
EncryptionKeyMetadata keyMetadata = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method in EncryptionUtil
returns a package-private implementation, KeyMetadata
. That leaks the class outside of the package but it isn't useful. I think that's why this added methods to EncryptionKeyMetadata
. I think it's better to make KeyMetadata
(or StandardKeyMetadata
as I renamed it in my last PR) public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
@@ -118,7 +118,7 @@ public DataWriter<T> newDataWriter( | |||
|
|||
case PARQUET: | |||
Parquet.DataWriteBuilder parquetBuilder = | |||
Parquet.writeData(outputFile) | |||
Parquet.writeData(file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since encryptingOutputFile
will create an AesGcmOutputFile
, I don't think it should be called unless it is going to be used. I think outputFile
should be removed and the branches that pass an OutputFile
(ORC and Avro) should call file.encryptingOutputFile()
inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
@@ -146,10 +154,12 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter( | |||
"Equality delete row schema shouldn't be null when creating equality-delete writer"); | |||
|
|||
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); | |||
OutputFile outputFile = file.encryptingOutputFile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, I think this should be called inline like it was before.
public static WriteBuilder write(EncryptedOutputFile file) { | ||
if (EncryptionUtil.useNativeEncryption(file.keyMetadata())) { | ||
return write(file.rawOutputFile()) | ||
.withFileEncryptionKey(file.keyMetadata().encryptionKey()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I suggested adding encryptionKey
and aadPrefix
to the base, but after looking at the castOrParse
, I think it would be cleaner to use that here to get an instance of StandardKeyMetadata
.
It may also be simpler if this used a direct instanceof
check here. Then you could just cast directly. You can't do that with a method that may change doing the type check.
if (file.keyMetadata() instanceof StandardKeyMetadata) {
StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata();
return write(file.plainOutputFile())
.withFileEncryptionKey(keyMetadata.encryptionKey())
.withAADPrefix(keyMetadata.aadPrefix())
} else {
return write(file.encryptingOutputFile());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this also needs to call withKeyMetadata
so that the key metadata instance is set.
After that, it's awkward that the key metadata is set separately than the encryption key and AAD prefix in this case because the key metadata is not opaque. I think the solution is to do the instanceof
check above in withKeyMetadata
. That way if the key metadata is StandardKeyMetadata
then it will also configure the AAD prefix and key. If not, then setting them separately makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the first suggestion.
As for the second - this WriterBuilder
class doesn't have a withKeyMetadata
method (since it is a basic writer appender).
} | ||
|
||
public static boolean useNativeEncryption(EncryptionKeyMetadata keyMetadata) { | ||
return keyMetadata != null && keyMetadata instanceof KeyMetadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The null check is redundant and can be removed.
@@ -175,15 +181,15 @@ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFor | |||
.build(); | |||
|
|||
case AVRO: | |||
return Avro.write(file) | |||
return Avro.write(file.encryptingOutputFile()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ggershinsky, it seems to me that with the AES GCM streams set up, Avro encryption would also work, right? In fact, although the StandardEncryptionManager
is not used unless the format is Parquet, I think you can still request a format in individual writes. Those would work and use AES GCM stream encryption.
I think we need to change how we prevent Avro and ORC encryption. Instead of doing the check when creating the encryption manager, it should be done here instead. What I would do is add Avro.write(EncryptingOutputStream)
and ORC.write(EncryptingOutputStream)
and have them throw UnsupportedOperationException
unless the key metadata is null. That will prevent AES GCM from being used until we want to add the feature.
We should also consider whether this will just work for Avro files!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
use key and aadPrefix explicitly util post-review changes package-private KeyMetadata fix key metadata method signatures fix NPE update PositionDeletesRowReader use ALL_CAPS move spark 3.3 to 3.4, flink 1.16 to 1.17 update spark source BaseReader address review comments clean up update revapi revert visibility limit revert TableOperations move plaintext manager changes to another pr address review comments revert BaseEncryptedOutputFile
27f2caa
to
e20cda6
Compare
Moved to main branch base via #9359 |
No description provided.