From b968740aede788abf46d4d77289d27a8fe6eadb9 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 30 Jan 2024 17:06:02 -0800 Subject: [PATCH 1/9] API: Extend FileIO and add EncryptingFileIO. --- .../iceberg/encryption/EncryptingFileIO.java | 210 ++++++++++++++++++ .../java/org/apache/iceberg/io/FileIO.java | 28 +++ .../NativeEncryptionOutputFile.java | 24 +- .../encryption/StandardEncryptionManager.java | 4 + .../iceberg/spark/source/BaseReader.java | 23 +- 5 files changed, 270 insertions(+), 19 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java new file mode 100644 index 000000000000..82670232a8a7 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +public class EncryptingFileIO implements FileIO, Serializable { + public static EncryptingFileIO create(FileIO io, EncryptionManager em) { + return new EncryptingFileIO(io, em); + } + + private final FileIO io; + private final EncryptionManager em; + + EncryptingFileIO(FileIO io, EncryptionManager em) { + this.io = io; + this.em = em; + } + + public Map bulkDecrypt(Iterable> files) { + Iterable decrypted = em.decrypt(Iterables.transform(files, this::wrap)); + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (InputFile in : decrypted) { + builder.put(in.location(), in); + } + + return builder.buildKeepingLast(); + } + + public EncryptionManager encryptionManager() { + return em; + } + + @Override + public InputFile newInputFile(String path) { + return io.newInputFile(path); + } + + @Override + public InputFile newInputFile(String path, long length) { + return io.newInputFile(path, length); + } + + @Override + public InputFile newInputFile(DataFile file) { + return newInputFile((ContentFile) file); + } + + @Override + public InputFile newInputFile(DeleteFile file) { + return newInputFile((ContentFile) file); + } + + private InputFile newInputFile(ContentFile file) { + if (file.keyMetadata() != null) { + return newDecryptingInputFile( + file.path().toString(), file.fileSizeInBytes(), file.keyMetadata()); + } else { + return newInputFile(file.path().toString(), file.fileSizeInBytes()); + } + } + + @Override + public InputFile newInputFile(ManifestFile manifest) { + if (manifest.keyMetadata() != null) { + return newDecryptingInputFile(manifest.path(), manifest.length(), manifest.keyMetadata()); + } else { + return newInputFile(manifest.path(), manifest.length()); + } + } + + public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) { + return em.decrypt(wrap(io.newInputFile(path), buffer)); + } + + public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) { + // TODO: is the length correct for the encrypted file? It may be the length of the plaintext + // stream + return em.decrypt(wrap(io.newInputFile(path, length), buffer)); + } + + @Override + public OutputFile newOutputFile(String path) { + return io.newOutputFile(path); + } + + public EncryptedOutputFile newEncryptingOutputFile(String path) { + OutputFile plainOutputFile = io.newOutputFile(path); + return em.encrypt(plainOutputFile); + } + + @Override + public void deleteFile(String path) { + io.deleteFile(path); + } + + @Override + public void close() { + io.close(); + + if (em instanceof Closeable) { + try { + ((Closeable) em).close(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close encryption manager", e); + } + } + } + + private SimpleEncryptedInputFile wrap(ContentFile file) { + InputFile encryptedInputFile = io.newInputFile(file.path().toString(), file.fileSizeInBytes()); + return new SimpleEncryptedInputFile(encryptedInputFile, toKeyMetadata(file.keyMetadata())); + } + + private static SimpleEncryptedInputFile wrap(InputFile encryptedInputFile, ByteBuffer buffer) { + return new SimpleEncryptedInputFile(encryptedInputFile, toKeyMetadata(buffer)); + } + + private static EncryptionKeyMetadata toKeyMetadata(ByteBuffer buffer) { + return buffer != null ? new SimpleKeyMetadata(buffer) : EmptyKeyMetadata.get(); + } + + private static class SimpleEncryptedInputFile implements EncryptedInputFile { + private final InputFile encryptedInputFile; + private final EncryptionKeyMetadata keyMetadata; + + private SimpleEncryptedInputFile( + InputFile encryptedInputFile, EncryptionKeyMetadata keyMetadata) { + this.encryptedInputFile = encryptedInputFile; + this.keyMetadata = keyMetadata; + } + + @Override + public InputFile encryptedInputFile() { + return encryptedInputFile; + } + + @Override + public EncryptionKeyMetadata keyMetadata() { + return keyMetadata; + } + } + + private static class SimpleKeyMetadata implements EncryptionKeyMetadata { + private final ByteBuffer metadataBuffer; + + private SimpleKeyMetadata(ByteBuffer metadataBuffer) { + this.metadataBuffer = metadataBuffer; + } + + @Override + public ByteBuffer buffer() { + return metadataBuffer; + } + + @Override + public EncryptionKeyMetadata copy() { + return new SimpleKeyMetadata(metadataBuffer.duplicate()); + } + } + + private static class EmptyKeyMetadata implements EncryptionKeyMetadata { + private static final EmptyKeyMetadata INSTANCE = new EmptyKeyMetadata(); + + private static EmptyKeyMetadata get() { + return INSTANCE; + } + + @Override + public ByteBuffer buffer() { + return null; + } + + @Override + public EncryptionKeyMetadata copy() { + return this; + } + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index 928f09eb20b6..503b7edb397f 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -21,6 +21,10 @@ import java.io.Closeable; import java.io.Serializable; import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** * Pluggable module for reading, writing, and deleting files. @@ -42,6 +46,30 @@ default InputFile newInputFile(String path, long length) { return newInputFile(path); } + default InputFile newInputFile(DataFile file) { + Preconditions.checkArgument( + file.keyMetadata() == null, + "Cannot decrypt data file: {} (use DecryptingFileIO)", + file.path()); + return newInputFile(file.path().toString()); + } + + default InputFile newInputFile(DeleteFile file) { + Preconditions.checkArgument( + file.keyMetadata() == null, + "Cannot decrypt delete file: {} (use DecryptingFileIO)", + file.path()); + return newInputFile(file.path().toString()); + } + + default InputFile newInputFile(ManifestFile manifest) { + Preconditions.checkArgument( + manifest.keyMetadata() == null, + "Cannot decrypt manifest: {} (use DecryptingFileIO)", + manifest.path()); + return newInputFile(manifest.path()); + } + /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java index 0d0d5da8a677..6d095252cb99 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java @@ -18,13 +18,35 @@ */ package org.apache.iceberg.encryption; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; /** An {@link EncryptedOutputFile} that can be used for format-native encryption. */ -public interface NativeEncryptionOutputFile extends EncryptedOutputFile { +public interface NativeEncryptionOutputFile extends EncryptedOutputFile, OutputFile { @Override NativeEncryptionKeyMetadata keyMetadata(); /** An {@link OutputFile} instance for the underlying (plaintext) output stream. */ OutputFile plainOutputFile(); + + @Override + default PositionOutputStream create() { + return encryptingOutputFile().create(); + } + + @Override + default PositionOutputStream createOrOverwrite() { + return encryptingOutputFile().createOrOverwrite(); + } + + @Override + default String location() { + return encryptingOutputFile().location(); + } + + @Override + default InputFile toInputFile() { + return encryptingOutputFile().toInputFile(); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 185f4d6f81bb..119d2a5f9ae2 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -61,6 +61,10 @@ public NativeEncryptionOutputFile encrypt(OutputFile plainOutput) { @Override public NativeEncryptionInputFile decrypt(EncryptedInputFile encrypted) { // this input file will lazily parse key metadata in case the file is not an AES GCM stream. + if (encrypted instanceof NativeEncryptionInputFile) { + return (NativeEncryptionInputFile) encrypted; + } + return new StandardDecryptedInputFile(encrypted); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index c2b3e7c2dc56..f67171708bb2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -46,14 +46,11 @@ import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.DeleteLoader; import org.apache.iceberg.deletes.DeleteCounter; -import org.apache.iceberg.encryption.EncryptedFiles; -import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkExecutorCache; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; @@ -184,25 +181,15 @@ protected InputFile getInputFile(String location) { private Map inputFiles() { if (lazyInputFiles == null) { - Stream encryptedFiles = - taskGroup.tasks().stream().flatMap(this::referencedFiles).map(this::toEncryptedInputFile); - - // decrypt with the batch call to avoid multiple RPCs to a key server, if possible - Iterable decryptedFiles = table.encryption().decrypt(encryptedFiles::iterator); - - Map files = Maps.newHashMapWithExpectedSize(taskGroup.tasks().size()); - decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted)); - this.lazyInputFiles = ImmutableMap.copyOf(files); + this.lazyInputFiles = + EncryptingFileIO.create(table().io(), table().encryption()) + .bulkDecrypt( + () -> taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator()); } return lazyInputFiles; } - private EncryptedInputFile toEncryptedInputFile(ContentFile file) { - InputFile inputFile = table.io().newInputFile(file.path().toString()); - return EncryptedFiles.encryptedInput(inputFile, file.keyMetadata()); - } - protected Map constantsMap(ContentScanTask task, Schema readSchema) { if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { StructType partitionType = Partitioning.partitionType(table); From a3432f40cb9ee45dc76497bb4ae3f615e9436cef Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 7 Feb 2024 16:16:19 -0800 Subject: [PATCH 2/9] Use new FileIO mehtods in ManifestFiles, fix ContentCache. --- .../org/apache/iceberg/ManifestFiles.java | 32 ++-- .../org/apache/iceberg/io/ContentCache.java | 150 +++++++----------- 2 files changed, 69 insertions(+), 113 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index c23ab667a41b..61dd517365f6 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -126,7 +126,7 @@ public static ManifestReader read( manifest.content() == ManifestContent.DATA, "Cannot read a delete manifest with a ManifestReader: %s", manifest); - InputFile file = newInputFile(io, manifest.path(), manifest.length()); + InputFile file = newInputFile(io, manifest); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>( file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DATA_FILES); @@ -181,7 +181,7 @@ public static ManifestReader readDeleteManifest( manifest.content() == ManifestContent.DELETES, "Cannot read a data manifest with a DeleteManifestReader: %s", manifest); - InputFile file = newInputFile(io, manifest.path(), manifest.length()); + InputFile file = newInputFile(io, manifest); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>( file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DELETE_FILES); @@ -345,34 +345,24 @@ private static ManifestFile copyManifestInternal( return writer.toManifestFile(); } - private static InputFile newInputFile(FileIO io, String path, long length) { - boolean enabled; - - try { - enabled = cachingEnabled(io); - } catch (UnsupportedOperationException e) { - // There is an issue reading io.properties(). Disable caching. - enabled = false; - } - - if (enabled) { - ContentCache cache = contentCache(io); - Preconditions.checkNotNull( - cache, - "ContentCache creation failed. Check that all manifest caching configurations has valid value."); - LOG.debug("FileIO-level cache stats: {}", CONTENT_CACHES.stats()); - return cache.tryCache(io, path, length); + private static InputFile newInputFile(FileIO io, ManifestFile manifest) { + InputFile input = io.newInputFile(manifest); + if (cachingEnabled(io)) { + return contentCache(io).tryCache(input); } - // caching is not enable for this io or caught RuntimeException. - return io.newInputFile(path, length); + return input; } static boolean cachingEnabled(FileIO io) { + try { return PropertyUtil.propertyAsBoolean( io.properties(), CatalogProperties.IO_MANIFEST_CACHE_ENABLED, CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT); + } catch (UnsupportedOperationException e) { + return false; + } } static long cacheDurationMs(FileIO io) { diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index c999f3f333f6..f7205c0f44c2 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -32,7 +32,6 @@ import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +39,8 @@ /** * Class that provides file-content caching during reading. * - *

The file-content caching is initiated by calling {@link ContentCache#tryCache(FileIO, String, - * long)}. Given a FileIO, a file location string, and file length that is within allowed limit, + *

The file-content caching is initiated by calling {@link ContentCache#tryCache(InputFile)}. + * Given a FileIO, a file location string, and file length that is within allowed limit, * ContentCache will return a {@link CachingInputFile} that is backed by the cache. Calling {@link * CachingInputFile#newStream()} will return a {@link ByteBufferInputStream} backed by list of * {@link ByteBuffer} from the cache if such file-content exist in the cache. If the file-content @@ -56,7 +55,7 @@ public class ContentCache { private final long expireAfterAccessMs; private final long maxTotalBytes; private final long maxContentLength; - private final Cache cache; + private final Cache cache; /** * Constructor for ContentCache class. @@ -86,11 +85,11 @@ public ContentCache(long expireAfterAccessMs, long maxTotalBytes, long maxConten builder .maximumWeight(maxTotalBytes) .weigher( - (Weigher) + (Weigher) (key, value) -> (int) Math.min(value.length, Integer.MAX_VALUE)) .softValues() .removalListener( - (location, cacheEntry, cause) -> + (location, fileContent, cause) -> LOG.debug("Evicted {} from ContentCache ({})", location, cause)) .recordStats() .build(); @@ -112,11 +111,11 @@ public CacheStats stats() { return cache.stats(); } - public CacheEntry get(String key, Function mappingFunction) { + public FileContent get(String key, Function mappingFunction) { return cache.get(key, mappingFunction); } - public CacheEntry getIfPresent(String location) { + public FileContent getIfPresent(String location) { return cache.getIfPresent(location); } @@ -127,17 +126,15 @@ public CacheEntry getIfPresent(String location) { * and no caching will be done for that file. Otherwise, this method will return a {@link * CachingInputFile} that serve file reads backed by ContentCache. * - * @param io a FileIO associated with the location. - * @param location URL/path of a file accessible by io. - * @param length the known length of such file. + * @param input an InputFile to cache * @return a {@link CachingInputFile} if length is within allowed limit. Otherwise, a regular * {@link InputFile} for given location. */ - public InputFile tryCache(FileIO io, String location, long length) { - if (length <= maxContentLength) { - return new CachingInputFile(this, io, location, length); + public InputFile tryCache(InputFile input) { + if (input.getLength() <= maxContentLength) { + return new CachingInputFile(this, input); } - return io.newInputFile(location, length); + return input; } public void invalidate(String key) { @@ -166,11 +163,11 @@ public String toString() { .toString(); } - private static class CacheEntry { + private static class FileContent { private final long length; private final List buffers; - private CacheEntry(long length, List buffers) { + private FileContent(long length, List buffers) { this.length = length; this.buffers = buffers; } @@ -187,34 +184,20 @@ private CacheEntry(long length, List buffers) { */ private static class CachingInputFile implements InputFile { private final ContentCache contentCache; - private final FileIO io; - private final String location; - private final long length; - private InputFile fallbackInputFile = null; + private final InputFile input; - private CachingInputFile(ContentCache cache, FileIO io, String location, long length) { + private CachingInputFile(ContentCache cache, InputFile input) { this.contentCache = cache; - this.io = io; - this.location = location; - this.length = length; - } - - private InputFile wrappedInputFile() { - if (fallbackInputFile == null) { - fallbackInputFile = io.newInputFile(location, length); - } - return fallbackInputFile; + this.input = input; } @Override public long getLength() { - CacheEntry buf = contentCache.getIfPresent(location); + FileContent buf = contentCache.getIfPresent(input.location()); if (buf != null) { return buf.length; - } else if (fallbackInputFile != null) { - return fallbackInputFile.getLength(); } else { - return length; + return input.getLength(); } } @@ -232,80 +215,63 @@ public long getLength() { @Override public SeekableInputStream newStream() { try { - // read from cache if file length is less than or equal to maximum length allowed to - // cache. - if (getLength() <= contentCache.maxContentLength()) { - return cachedStream(); - } - - // fallback to non-caching input stream. - return wrappedInputFile().newStream(); + return cachedStream(); } catch (FileNotFoundException e) { - throw new NotFoundException( - e, "Failed to open input stream for file %s: %s", location, e.toString()); + throw new NotFoundException(e, "Failed to open file: %s", input.location()); } catch (IOException e) { - throw new UncheckedIOException( - String.format("Failed to open input stream for file %s: %s", location, e), e); + return input.newStream(); } } @Override public String location() { - return location; + return input.location(); } @Override public boolean exists() { - CacheEntry buf = contentCache.getIfPresent(location); - return buf != null || wrappedInputFile().exists(); - } - - private CacheEntry cacheEntry() { - long start = System.currentTimeMillis(); - try (SeekableInputStream stream = wrappedInputFile().newStream()) { - long fileLength = getLength(); - long totalBytesToRead = fileLength; - List buffers = Lists.newArrayList(); - - while (totalBytesToRead > 0) { - // read the stream in 4MB chunk - int bytesToRead = (int) Math.min(BUFFER_CHUNK_SIZE, totalBytesToRead); - byte[] buf = new byte[bytesToRead]; - int bytesRead = IOUtil.readRemaining(stream, buf, 0, bytesToRead); - totalBytesToRead -= bytesRead; - - if (bytesRead < bytesToRead) { - // Read less than it should be, possibly hitting EOF. Abandon caching by throwing - // IOException and let the caller fallback to non-caching input file. - throw new IOException( - String.format( - "Expected to read %d bytes, but only %d bytes read.", - fileLength, fileLength - totalBytesToRead)); - } else { - buffers.add(ByteBuffer.wrap(buf)); - } - } - - CacheEntry newEntry = new CacheEntry(fileLength, buffers); - LOG.debug("cacheEntry took {} ms for {}", (System.currentTimeMillis() - start), location); - return newEntry; - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } + FileContent buf = contentCache.getIfPresent(input.location()); + return buf != null || input.exists(); } private SeekableInputStream cachedStream() throws IOException { try { - CacheEntry entry = contentCache.get(location, k -> cacheEntry()); - Preconditions.checkNotNull( - entry, "CacheEntry should not be null when there is no RuntimeException occurs"); - LOG.debug("Cache stats: {}", contentCache.stats()); - return ByteBufferInputStream.wrap(entry.buffers); + FileContent content = contentCache.get(input.location(), k -> download(input)); + return ByteBufferInputStream.wrap(content.buffers); } catch (UncheckedIOException ex) { throw ex.getCause(); - } catch (RuntimeException ex) { - throw new IOException("Caught an error while reading from cache", ex); } } } + + private static FileContent download(InputFile input) { + try (SeekableInputStream stream = input.newStream()) { + long fileLength = input.getLength(); + long totalBytesToRead = fileLength; + List buffers = Lists.newArrayList(); + + while (totalBytesToRead > 0) { + // read the stream in chunks + int bytesToRead = (int) Math.min(BUFFER_CHUNK_SIZE, totalBytesToRead); + byte[] buf = new byte[bytesToRead]; + int bytesRead = IOUtil.readRemaining(stream, buf, 0, bytesToRead); + totalBytesToRead -= bytesRead; + + if (bytesRead < bytesToRead) { + // Read less than it should be, possibly hitting EOF. Abandon caching by throwing + // IOException and let the caller fallback to non-caching input file. + throw new IOException( + String.format( + "Failed to read %d bytes: %d bytes in stream", + fileLength, fileLength - totalBytesToRead)); + } else { + buffers.add(ByteBuffer.wrap(buf)); + } + } + + return new FileContent(fileLength, buffers); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } } From 1fbc85a26472ddd99aa49a34b03871ed815b3d1b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 18 Feb 2024 12:23:47 -0800 Subject: [PATCH 3/9] Fix test failures and review comments. --- .../iceberg/encryption/EncryptingFileIO.java | 4 ++ .../java/org/apache/iceberg/io/FileIO.java | 6 +-- .../hadoop/TestCatalogUtilDropTable.java | 42 +++++++++++-------- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java index 82670232a8a7..ea63b29754bc 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -36,6 +36,10 @@ public class EncryptingFileIO implements FileIO, Serializable { public static EncryptingFileIO create(FileIO io, EncryptionManager em) { + if (io instanceof EncryptingFileIO) { + return (EncryptingFileIO) io; + } + return new EncryptingFileIO(io, em); } diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index 503b7edb397f..fc6a53367f21 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -49,7 +49,7 @@ default InputFile newInputFile(String path, long length) { default InputFile newInputFile(DataFile file) { Preconditions.checkArgument( file.keyMetadata() == null, - "Cannot decrypt data file: {} (use DecryptingFileIO)", + "Cannot decrypt data file: {} (use EncryptingFileIO)", file.path()); return newInputFile(file.path().toString()); } @@ -57,7 +57,7 @@ default InputFile newInputFile(DataFile file) { default InputFile newInputFile(DeleteFile file) { Preconditions.checkArgument( file.keyMetadata() == null, - "Cannot decrypt delete file: {} (use DecryptingFileIO)", + "Cannot decrypt delete file: {} (use EncryptingFileIO)", file.path()); return newInputFile(file.path().toString()); } @@ -65,7 +65,7 @@ default InputFile newInputFile(DeleteFile file) { default InputFile newInputFile(ManifestFile manifest) { Preconditions.checkArgument( manifest.keyMetadata() == null, - "Cannot decrypt manifest: {} (use DecryptingFileIO)", + "Cannot decrypt manifest: {} (use EncryptingFileIO)", manifest.path()); return newInputFile(manifest.path()); } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index 9cb44b1341cb..e3d2b4d8db92 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -28,6 +28,8 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.GenericBlobMetadata; import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; @@ -90,13 +92,7 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { .as("should have 1 partition stats file") .containsExactly(partitionStatisticsFile.path()); - FileIO fileIO = Mockito.mock(FileIO.class); - Mockito.when(fileIO.newInputFile(Mockito.anyString())) - .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0))); - Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong())) - .thenAnswer( - invocation -> - table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1))); + FileIO fileIO = createMockFileIO(table.io()); CatalogUtil.dropTableData(fileIO, tableMetadata); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); @@ -141,14 +137,7 @@ public void dropTableDataDoNotThrowWhenDeletesFail() { TableMetadata tableMetadata = readMetadataVersion(3); Set snapshotSet = Sets.newHashSet(table.snapshots()); - FileIO fileIO = Mockito.mock(FileIO.class); - Mockito.when(fileIO.newInputFile(Mockito.anyString())) - .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0))); - Mockito.when(fileIO.newInputFile(Mockito.anyString(), Mockito.anyLong())) - .thenAnswer( - invocation -> - table.io().newInputFile(invocation.getArgument(0), invocation.getArgument(1))); - Mockito.doThrow(new RuntimeException()).when(fileIO).deleteFile(ArgumentMatchers.anyString()); + FileIO fileIO = createMockFileIO(table.io()); CatalogUtil.dropTableData(fileIO, tableMetadata); Mockito.verify( @@ -176,9 +165,7 @@ public void shouldNotDropDataFilesIfGcNotEnabled() { Assertions.assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2); Assertions.assertThat(metadataLocations).as("should have 4 metadata locations").hasSize(4); - FileIO fileIO = Mockito.mock(FileIO.class); - Mockito.when(fileIO.newInputFile(Mockito.anyString())) - .thenAnswer(invocation -> table.io().newInputFile(invocation.getArgument(0))); + FileIO fileIO = createMockFileIO(table.io()); CatalogUtil.dropTableData(fileIO, tableMetadata); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); @@ -201,6 +188,25 @@ public void shouldNotDropDataFilesIfGcNotEnabled() { .containsAll(metadataLocations); } + private static FileIO createMockFileIO(FileIO wrapped) { + FileIO mockIO = Mockito.mock(FileIO.class); + + Mockito.when(mockIO.newInputFile(Mockito.anyString())) + .thenAnswer(invocation -> wrapped.newInputFile((String) invocation.getArgument(0))); + Mockito.when(mockIO.newInputFile(Mockito.anyString(), Mockito.anyLong())) + .thenAnswer( + invocation -> + wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1))); + Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class))) + .thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0))); + Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class))) + .thenAnswer(invocation -> wrapped.newInputFile((DataFile) invocation.getArgument(0))); + Mockito.when(mockIO.newInputFile(Mockito.any(DeleteFile.class))) + .thenAnswer(invocation -> wrapped.newInputFile((DeleteFile) invocation.getArgument(0))); + + return mockIO; + } + private static Set manifestListLocations(Set snapshotSet) { return snapshotSet.stream().map(Snapshot::manifestListLocation).collect(Collectors.toSet()); } From 18f758e36cc70b26b93e3acf793f42b3186fe4c6 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 18 Feb 2024 12:28:08 -0800 Subject: [PATCH 4/9] Add back public method as deprecated. --- .../org/apache/iceberg/io/ContentCache.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index f7205c0f44c2..6cc8d591f6e5 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -119,6 +119,25 @@ public FileContent getIfPresent(String location) { return cache.getIfPresent(location); } + /** + * Try cache the file-content of file in the given location upon stream reading. + * + *

If length is longer than maximum length allowed by ContentCache, a regular {@link InputFile} + * and no caching will be done for that file. Otherwise, this method will return a {@link + * CachingInputFile} that serve file reads backed by ContentCache. + * + * @param io a FileIO associated with the location. + * @param location URL/path of a file accessible by io. + * @param length the known length of such file. + * @return a {@link CachingInputFile} if length is within allowed limit. Otherwise, a regular + * {@link InputFile} for given location. + * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead + */ + @Deprecated + public InputFile tryCache(FileIO io, String location, long length) { + return tryCache(io.newInputFile(location, length)); + } + /** * Try cache the file-content of file in the given location upon stream reading. * From 491754aacc7097287234320bbc10cd94a966b9ab Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 18 Feb 2024 12:49:47 -0800 Subject: [PATCH 5/9] Fix revapi for ContentCache. --- .../src/main/java/org/apache/iceberg/io/ContentCache.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index 6cc8d591f6e5..c1b6788968d9 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -182,7 +182,13 @@ public String toString() { .toString(); } - private static class FileContent { + /** + * @deprecated will be removed in 1.7; use {@link FileContent} instead. + */ + @Deprecated + private static class CacheEntry {} + + private static class FileContent extends CacheEntry { private final long length; private final List buffers; From 888dffc2ea99248a279d9c20df5243d06b8d3fc3 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 18 Feb 2024 12:54:54 -0800 Subject: [PATCH 6/9] Deprecate ContentCache methods that expose private classes. --- .../org/apache/iceberg/io/ContentCache.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index c1b6788968d9..0d2a19351e8c 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -111,26 +111,23 @@ public CacheStats stats() { return cache.stats(); } + /** + * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead + */ + @Deprecated public FileContent get(String key, Function mappingFunction) { return cache.get(key, mappingFunction); } + /** + * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead + */ + @Deprecated public FileContent getIfPresent(String location) { return cache.getIfPresent(location); } /** - * Try cache the file-content of file in the given location upon stream reading. - * - *

If length is longer than maximum length allowed by ContentCache, a regular {@link InputFile} - * and no caching will be done for that file. Otherwise, this method will return a {@link - * CachingInputFile} that serve file reads backed by ContentCache. - * - * @param io a FileIO associated with the location. - * @param location URL/path of a file accessible by io. - * @param length the known length of such file. - * @return a {@link CachingInputFile} if length is within allowed limit. Otherwise, a regular - * {@link InputFile} for given location. * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */ @Deprecated From a3013c1193d3fdd9c42b7897b1db5c4e093b55f4 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 18 Feb 2024 13:03:37 -0800 Subject: [PATCH 7/9] Apply spotless. --- .../java/org/apache/iceberg/ManifestFiles.java | 8 ++++---- .../java/org/apache/iceberg/io/ContentCache.java | 16 ++++------------ 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 61dd517365f6..f7b1add6bdc5 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -356,10 +356,10 @@ private static InputFile newInputFile(FileIO io, ManifestFile manifest) { static boolean cachingEnabled(FileIO io) { try { - return PropertyUtil.propertyAsBoolean( - io.properties(), - CatalogProperties.IO_MANIFEST_CACHE_ENABLED, - CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT); + return PropertyUtil.propertyAsBoolean( + io.properties(), + CatalogProperties.IO_MANIFEST_CACHE_ENABLED, + CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT); } catch (UnsupportedOperationException e) { return false; } diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index 0d2a19351e8c..27ef1a2b7f72 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -111,25 +111,19 @@ public CacheStats stats() { return cache.stats(); } - /** - * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead - */ + /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */ @Deprecated public FileContent get(String key, Function mappingFunction) { return cache.get(key, mappingFunction); } - /** - * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead - */ + /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */ @Deprecated public FileContent getIfPresent(String location) { return cache.getIfPresent(location); } - /** - * @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead - */ + /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */ @Deprecated public InputFile tryCache(FileIO io, String location, long length) { return tryCache(io.newInputFile(location, length)); @@ -179,9 +173,7 @@ public String toString() { .toString(); } - /** - * @deprecated will be removed in 1.7; use {@link FileContent} instead. - */ + /** @deprecated will be removed in 1.7; use {@link FileContent} instead. */ @Deprecated private static class CacheEntry {} From 3d5191468ff41da5eef6e5e3c08a6f2f0202e97b Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 18 Feb 2024 13:09:26 -0800 Subject: [PATCH 8/9] Return CacheEntry. --- core/src/main/java/org/apache/iceberg/io/ContentCache.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index 27ef1a2b7f72..e529403b2cf1 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -113,13 +113,13 @@ public CacheStats stats() { /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */ @Deprecated - public FileContent get(String key, Function mappingFunction) { + public CacheEntry get(String key, Function mappingFunction) { return cache.get(key, mappingFunction); } /** @deprecated will be removed in 1.7; use {@link #tryCache(InputFile)} instead */ @Deprecated - public FileContent getIfPresent(String location) { + public CacheEntry getIfPresent(String location) { return cache.getIfPresent(location); } From 0859f18bf6715c2072173c531833eea162cea969 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 19 Feb 2024 09:05:19 -0800 Subject: [PATCH 9/9] Fix uses of getIfPresent. --- core/src/main/java/org/apache/iceberg/io/ContentCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ContentCache.java b/core/src/main/java/org/apache/iceberg/io/ContentCache.java index e529403b2cf1..7942c69d5d77 100644 --- a/core/src/main/java/org/apache/iceberg/io/ContentCache.java +++ b/core/src/main/java/org/apache/iceberg/io/ContentCache.java @@ -207,7 +207,7 @@ private CachingInputFile(ContentCache cache, InputFile input) { @Override public long getLength() { - FileContent buf = contentCache.getIfPresent(input.location()); + FileContent buf = contentCache.cache.getIfPresent(input.location()); if (buf != null) { return buf.length; } else { @@ -244,13 +244,13 @@ public String location() { @Override public boolean exists() { - FileContent buf = contentCache.getIfPresent(input.location()); + FileContent buf = contentCache.cache.getIfPresent(input.location()); return buf != null || input.exists(); } private SeekableInputStream cachedStream() throws IOException { try { - FileContent content = contentCache.get(input.location(), k -> download(input)); + FileContent content = contentCache.cache.get(input.location(), k -> download(input)); return ByteBufferInputStream.wrap(content.buffers); } catch (UncheckedIOException ex) { throw ex.getCause();