Skip to content

Commit

Permalink
API: Extend FileIO and add EncryptingFileIO. (#9592)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored Feb 19, 2024
1 parent 598552e commit 4c5208a
Show file tree
Hide file tree
Showing 8 changed files with 384 additions and 153 deletions.
214 changes: 214 additions & 0 deletions api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

public class EncryptingFileIO implements FileIO, Serializable {
public static EncryptingFileIO create(FileIO io, EncryptionManager em) {
if (io instanceof EncryptingFileIO) {
return (EncryptingFileIO) io;
}

return new EncryptingFileIO(io, em);
}

private final FileIO io;
private final EncryptionManager em;

EncryptingFileIO(FileIO io, EncryptionManager em) {
this.io = io;
this.em = em;
}

public Map<String, InputFile> bulkDecrypt(Iterable<? extends ContentFile<?>> files) {
Iterable<InputFile> decrypted = em.decrypt(Iterables.transform(files, this::wrap));

ImmutableMap.Builder<String, InputFile> builder = ImmutableMap.builder();
for (InputFile in : decrypted) {
builder.put(in.location(), in);
}

return builder.buildKeepingLast();
}

public EncryptionManager encryptionManager() {
return em;
}

@Override
public InputFile newInputFile(String path) {
return io.newInputFile(path);
}

@Override
public InputFile newInputFile(String path, long length) {
return io.newInputFile(path, length);
}

@Override
public InputFile newInputFile(DataFile file) {
return newInputFile((ContentFile<?>) file);
}

@Override
public InputFile newInputFile(DeleteFile file) {
return newInputFile((ContentFile<?>) file);
}

private InputFile newInputFile(ContentFile<?> file) {
if (file.keyMetadata() != null) {
return newDecryptingInputFile(
file.path().toString(), file.fileSizeInBytes(), file.keyMetadata());
} else {
return newInputFile(file.path().toString(), file.fileSizeInBytes());
}
}

@Override
public InputFile newInputFile(ManifestFile manifest) {
if (manifest.keyMetadata() != null) {
return newDecryptingInputFile(manifest.path(), manifest.length(), manifest.keyMetadata());
} else {
return newInputFile(manifest.path(), manifest.length());
}
}

public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) {
return em.decrypt(wrap(io.newInputFile(path), buffer));
}

public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) {
// TODO: is the length correct for the encrypted file? It may be the length of the plaintext
// stream
return em.decrypt(wrap(io.newInputFile(path, length), buffer));
}

@Override
public OutputFile newOutputFile(String path) {
return io.newOutputFile(path);
}

public EncryptedOutputFile newEncryptingOutputFile(String path) {
OutputFile plainOutputFile = io.newOutputFile(path);
return em.encrypt(plainOutputFile);
}

@Override
public void deleteFile(String path) {
io.deleteFile(path);
}

@Override
public void close() {
io.close();

if (em instanceof Closeable) {
try {
((Closeable) em).close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close encryption manager", e);
}
}
}

private SimpleEncryptedInputFile wrap(ContentFile<?> file) {
InputFile encryptedInputFile = io.newInputFile(file.path().toString(), file.fileSizeInBytes());
return new SimpleEncryptedInputFile(encryptedInputFile, toKeyMetadata(file.keyMetadata()));
}

private static SimpleEncryptedInputFile wrap(InputFile encryptedInputFile, ByteBuffer buffer) {
return new SimpleEncryptedInputFile(encryptedInputFile, toKeyMetadata(buffer));
}

private static EncryptionKeyMetadata toKeyMetadata(ByteBuffer buffer) {
return buffer != null ? new SimpleKeyMetadata(buffer) : EmptyKeyMetadata.get();
}

private static class SimpleEncryptedInputFile implements EncryptedInputFile {
private final InputFile encryptedInputFile;
private final EncryptionKeyMetadata keyMetadata;

private SimpleEncryptedInputFile(
InputFile encryptedInputFile, EncryptionKeyMetadata keyMetadata) {
this.encryptedInputFile = encryptedInputFile;
this.keyMetadata = keyMetadata;
}

@Override
public InputFile encryptedInputFile() {
return encryptedInputFile;
}

@Override
public EncryptionKeyMetadata keyMetadata() {
return keyMetadata;
}
}

private static class SimpleKeyMetadata implements EncryptionKeyMetadata {
private final ByteBuffer metadataBuffer;

private SimpleKeyMetadata(ByteBuffer metadataBuffer) {
this.metadataBuffer = metadataBuffer;
}

@Override
public ByteBuffer buffer() {
return metadataBuffer;
}

@Override
public EncryptionKeyMetadata copy() {
return new SimpleKeyMetadata(metadataBuffer.duplicate());
}
}

private static class EmptyKeyMetadata implements EncryptionKeyMetadata {
private static final EmptyKeyMetadata INSTANCE = new EmptyKeyMetadata();

private static EmptyKeyMetadata get() {
return INSTANCE;
}

@Override
public ByteBuffer buffer() {
return null;
}

@Override
public EncryptionKeyMetadata copy() {
return this;
}
}
}
28 changes: 28 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.io.Closeable;
import java.io.Serializable;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Pluggable module for reading, writing, and deleting files.
Expand All @@ -42,6 +46,30 @@ default InputFile newInputFile(String path, long length) {
return newInputFile(path);
}

default InputFile newInputFile(DataFile file) {
Preconditions.checkArgument(
file.keyMetadata() == null,
"Cannot decrypt data file: {} (use EncryptingFileIO)",
file.path());
return newInputFile(file.path().toString());
}

default InputFile newInputFile(DeleteFile file) {
Preconditions.checkArgument(
file.keyMetadata() == null,
"Cannot decrypt delete file: {} (use EncryptingFileIO)",
file.path());
return newInputFile(file.path().toString());
}

default InputFile newInputFile(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: {} (use EncryptingFileIO)",
manifest.path());
return newInputFile(manifest.path());
}

/** Get a {@link OutputFile} instance to write bytes to the file at the given path. */
OutputFile newOutputFile(String path);

Expand Down
40 changes: 15 additions & 25 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static ManifestReader<DataFile> read(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
manifest);
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InputFile file = newInputFile(io, manifest);
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(
file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DATA_FILES);
Expand Down Expand Up @@ -181,7 +181,7 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s",
manifest);
InputFile file = newInputFile(io, manifest.path(), manifest.length());
InputFile file = newInputFile(io, manifest);
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader<>(
file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DELETE_FILES);
Expand Down Expand Up @@ -345,34 +345,24 @@ private static ManifestFile copyManifestInternal(
return writer.toManifestFile();
}

private static InputFile newInputFile(FileIO io, String path, long length) {
boolean enabled;

try {
enabled = cachingEnabled(io);
} catch (UnsupportedOperationException e) {
// There is an issue reading io.properties(). Disable caching.
enabled = false;
}

if (enabled) {
ContentCache cache = contentCache(io);
Preconditions.checkNotNull(
cache,
"ContentCache creation failed. Check that all manifest caching configurations has valid value.");
LOG.debug("FileIO-level cache stats: {}", CONTENT_CACHES.stats());
return cache.tryCache(io, path, length);
private static InputFile newInputFile(FileIO io, ManifestFile manifest) {
InputFile input = io.newInputFile(manifest);
if (cachingEnabled(io)) {
return contentCache(io).tryCache(input);
}

// caching is not enable for this io or caught RuntimeException.
return io.newInputFile(path, length);
return input;
}

static boolean cachingEnabled(FileIO io) {
return PropertyUtil.propertyAsBoolean(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT);
try {
return PropertyUtil.propertyAsBoolean(
io.properties(),
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT);
} catch (UnsupportedOperationException e) {
return false;
}
}

static long cacheDurationMs(FileIO io) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,35 @@
*/
package org.apache.iceberg.encryption;

import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;

/** An {@link EncryptedOutputFile} that can be used for format-native encryption. */
public interface NativeEncryptionOutputFile extends EncryptedOutputFile {
public interface NativeEncryptionOutputFile extends EncryptedOutputFile, OutputFile {
@Override
NativeEncryptionKeyMetadata keyMetadata();

/** An {@link OutputFile} instance for the underlying (plaintext) output stream. */
OutputFile plainOutputFile();

@Override
default PositionOutputStream create() {
return encryptingOutputFile().create();
}

@Override
default PositionOutputStream createOrOverwrite() {
return encryptingOutputFile().createOrOverwrite();
}

@Override
default String location() {
return encryptingOutputFile().location();
}

@Override
default InputFile toInputFile() {
return encryptingOutputFile().toInputFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public NativeEncryptionOutputFile encrypt(OutputFile plainOutput) {
@Override
public NativeEncryptionInputFile decrypt(EncryptedInputFile encrypted) {
// this input file will lazily parse key metadata in case the file is not an AES GCM stream.
if (encrypted instanceof NativeEncryptionInputFile) {
return (NativeEncryptionInputFile) encrypted;
}

return new StandardDecryptedInputFile(encrypted);
}

Expand Down
Loading

0 comments on commit 4c5208a

Please sign in to comment.