diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 96aa14b128da..488290cecb24 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -55,6 +55,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionManagerFactory; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -134,6 +135,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private SnapshotMode snapshotMode = null; private Object conf = null; private FileIO io = null; + private EncryptionManagerFactory encryptionManagerFactory = null; private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; private CloseableGroup closeables = null; @@ -229,10 +231,12 @@ public void initialize(String name, Map unresolved) { } this.io = newFileIO(SessionContext.createEmpty(), mergedProps); + this.encryptionManagerFactory = CatalogUtil.loadEncryptionManagerFactory(mergedProps); this.fileIOCloser = newFileIOCloser(); this.closeables = new CloseableGroup(); this.closeables.addCloseable(this.io); + this.closeables.addCloseable(this.encryptionManagerFactory); this.closeables.addCloseable(this.client); this.closeables.setSuppressCloseFailure(true); @@ -396,6 +400,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { paths.table(finalIdentifier), session::headers, tableFileIO(context, response.config()), + encryptionManagerFactory, tableMetadata); trackFileIO(ops); @@ -469,6 +474,7 @@ public Table registerTable( paths.table(ident), session::headers, tableFileIO(context, response.config()), + encryptionManagerFactory, response.tableMetadata()); trackFileIO(ops); @@ -688,6 +694,7 @@ public Table create() { paths.table(ident), session::headers, tableFileIO(context, response.config()), + encryptionManagerFactory, response.tableMetadata()); trackFileIO(ops); @@ -710,6 +717,7 @@ public Transaction createTransaction() { paths.table(ident), session::headers, tableFileIO(context, response.config()), + encryptionManagerFactory, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta); @@ -774,6 +782,7 @@ public Transaction replaceTransaction() { paths.table(ident), session::headers, tableFileIO(context, response.config()), + encryptionManagerFactory, RESTTableOperations.UpdateType.REPLACE, changes.build(), base); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 0ce1afd93a79..31557c735e62 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -31,6 +31,8 @@ import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.UpdateRequirements; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagerFactory; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -54,6 +56,7 @@ enum UpdateType { private final String path; private final Supplier> headers; private final FileIO io; + private final EncryptionManagerFactory encryptionManagerFactory; private final List createChanges; private final TableMetadata replaceBase; private UpdateType updateType; @@ -64,8 +67,17 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + EncryptionManagerFactory encryptionManagerFactory, TableMetadata current) { - this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current); + this( + client, + path, + headers, + io, + encryptionManagerFactory, + UpdateType.SIMPLE, + Lists.newArrayList(), + current); } RESTTableOperations( @@ -73,6 +85,7 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + EncryptionManagerFactory encryptionManagerFactory, UpdateType updateType, List createChanges, TableMetadata current) { @@ -80,6 +93,7 @@ enum UpdateType { this.path = path; this.headers = headers; this.io = io; + this.encryptionManagerFactory = encryptionManagerFactory; this.updateType = updateType; this.createChanges = createChanges; this.replaceBase = current; @@ -194,6 +208,16 @@ public LocationProvider locationProvider() { return LocationProviders.locationsFor(current().location(), current().properties()); } + @Override + public EncryptionManager encryption() { + TableMetadata metadata = current(); + if (null != metadata) { + return encryptionManagerFactory.create(metadata); + } else { + return PlaintextEncryptionManager.instance(); + } + } + @Override public TableOperations temp(TableMetadata uncommittedMetadata) { return new TableOperations() { diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptedLocalOutputFile.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptedLocalOutputFile.java new file mode 100644 index 000000000000..ea54263abeae --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptedLocalOutputFile.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import static org.apache.iceberg.Files.localOutput; + +import java.io.File; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; + +public class EncryptedLocalOutputFile implements OutputFile, NativelyEncryptedFile { + private OutputFile localOutputFile; + private NativeFileCryptoParameters nativeEncryptionParameters; + + public EncryptedLocalOutputFile(File file) { + localOutputFile = localOutput(file); + } + + @Override + public PositionOutputStream create() { + return localOutputFile.create(); + } + + @Override + public PositionOutputStream createOrOverwrite() { + return localOutputFile.createOrOverwrite(); + } + + @Override + public String location() { + return localOutputFile.location(); + } + + @Override + public InputFile toInputFile() { + return localOutputFile.toInputFile(); + } + + @Override + public NativeFileCryptoParameters nativeCryptoParameters() { + return nativeEncryptionParameters; + } + + @Override + public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) { + this.nativeEncryptionParameters = nativeCryptoParameters; + } +} diff --git a/core/src/test/java/org/apache/iceberg/encryption/kms/UnitestKMS.java b/core/src/test/java/org/apache/iceberg/encryption/kms/UnitestKMS.java new file mode 100644 index 000000000000..19e4515b17a8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/encryption/kms/UnitestKMS.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption.kms; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class UnitestKMS extends MemoryMockKMS { + public static final String MASTER_KEY_NAME1 = "keyA"; + public static final byte[] MASTER_KEY1 = "0123456789012345".getBytes(StandardCharsets.UTF_8); + public static final String MASTER_KEY_NAME2 = "keyB"; + public static final byte[] MASTER_KEY2 = "1123456789012345".getBytes(StandardCharsets.UTF_8); + + @Override + public void initialize(Map properties) { + masterKeys = + ImmutableMap.of( + MASTER_KEY_NAME1, MASTER_KEY1, + MASTER_KEY_NAME2, MASTER_KEY2); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 69b5ceac8db2..747874e53912 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -31,7 +31,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -43,6 +45,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; @@ -50,6 +53,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -58,6 +62,14 @@ import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptedLocalOutputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EnvelopeEncryptionManager; +import org.apache.iceberg.encryption.EnvelopeMetadata; +import org.apache.iceberg.encryption.EnvelopeMetadataParser; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.kms.UnitestKMS; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.exceptions.NotFoundException; @@ -2194,6 +2206,263 @@ public void testCatalogTokenRefreshDisabledWithCredential(String oauth2ServerUri any()); } + @Test + public void testTableLevelEncryptionManager() { + Namespace namespace = Namespace.of("encryptionManagerFact"); + TableIdentifier plainTextEncryptionManagerId = + TableIdentifier.of(namespace, "PlaintextEncryptionManager"); + TableIdentifier envelopedEncryptionManagerId = + TableIdentifier.of(namespace, "EnvelopedEncryptionManager"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(namespace); + } + + // KMS configuration provide by Table Prop + Map tableProperties = Maps.newHashMap(); + tableProperties.put( + TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, UnitestKMS.class.getCanonicalName()); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); + tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + + Table createdPlainTextEncryptionManagerTable = + catalog().createTable(plainTextEncryptionManagerId, SCHEMA); + Table createdEnvelopedEncryptionManagerTable = + catalog() + .createTable( + envelopedEncryptionManagerId, + SCHEMA, + PartitionSpec.unpartitioned(), + tableProperties); + + EncryptionManager createdPlainTextEncryptionManager = + createdPlainTextEncryptionManagerTable.encryption(); + EncryptionManager createdEnvelopedEncryptionManager = + createdEnvelopedEncryptionManagerTable.encryption(); + + Assertions.assertThat(createdPlainTextEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + Assertions.assertThat(createdEnvelopedEncryptionManager) + .isInstanceOf(EnvelopeEncryptionManager.class); + + // loaded table encryptionManager + Table plainTextEncryptionManagerTable = catalog().loadTable(plainTextEncryptionManagerId); + Table envelopedEncryptionManagerTable = catalog().loadTable(envelopedEncryptionManagerId); + + EncryptionManager plainTextEncryptionManager = plainTextEncryptionManagerTable.encryption(); + EncryptionManager envelopedEncryptionManager = envelopedEncryptionManagerTable.encryption(); + + Assertions.assertThat(plainTextEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + Assertions.assertThat(envelopedEncryptionManager).isInstanceOf(EnvelopeEncryptionManager.class); + } + + @Test + public void testCatalogEncryptionManager() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + + // KMS configuration provide by catalog Prop + Map catalogProperties = + ImmutableMap.of( + CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL, UnitestKMS.class.getCanonicalName()); + catalog.initialize("prod", catalogProperties); + + Namespace namespace = Namespace.of("encryptionManagerFact"); + TableIdentifier plainTextEncryptionManagerId = + TableIdentifier.of(namespace, "PlaintextEncryptionManager"); + TableIdentifier envelopedEncryptionManagerId = + TableIdentifier.of(namespace, "EnvelopedEncryptionManager"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(namespace); + } + + // KMS key config provided by Table Prop + Map tableProperties = Maps.newHashMap(); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME2); + tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + + Table createdPlainTextEncryptionManagerTable = + catalog.createTable(plainTextEncryptionManagerId, SCHEMA); + Table createdEnvelopedEncryptionManagerTable = + catalog.createTable( + envelopedEncryptionManagerId, SCHEMA, PartitionSpec.unpartitioned(), tableProperties); + + EncryptionManager createdPlainTextEncryptionManager = + createdPlainTextEncryptionManagerTable.encryption(); + EncryptionManager createdEnvelopedEncryptionManager = + createdEnvelopedEncryptionManagerTable.encryption(); + + Assertions.assertThat(createdPlainTextEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + Assertions.assertThat(createdEnvelopedEncryptionManager) + .isInstanceOf(EnvelopeEncryptionManager.class); + + // loaded table encryptionManager + Table plainTextEncryptionManagerTable = catalog.loadTable(plainTextEncryptionManagerId); + Table envelopedEncryptionManagerTable = catalog.loadTable(envelopedEncryptionManagerId); + + EncryptionManager plainTextEncryptionManager = plainTextEncryptionManagerTable.encryption(); + EncryptionManager envelopedEncryptionManager = envelopedEncryptionManagerTable.encryption(); + + Assertions.assertThat(plainTextEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + Assertions.assertThat(envelopedEncryptionManager).isInstanceOf(EnvelopeEncryptionManager.class); + } + + @Test + public void testCatalogEncryptionSetIgnoreTableProperties() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + + // KMS configuration provide by catalog Prop + Map catalogProperties = + ImmutableMap.of( + CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL, + UnitestKMS.class.getCanonicalName(), + CatalogProperties.ENCRYPTION_IGNORE_TABLE_PROPS, + "true"); + + catalog.initialize("prod", catalogProperties); + + Namespace namespace = Namespace.of("encryptionManagerFact"); + TableIdentifier plainTextEncryptionManagerId = + TableIdentifier.of(namespace, "PlaintextEncryptionManager"); + TableIdentifier envelopedEncryptionManagerId = + TableIdentifier.of(namespace, "EnvelopedEncryptionManager"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(namespace); + } + + // KMS key config provided by Table Prop + Map tableProperties = Maps.newHashMap(); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME2); + tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + + Table createdPlainTextEncryptionManagerTable = + catalog.createTable(plainTextEncryptionManagerId, SCHEMA); + Table createdEnvelopedEncryptionManagerTable = + catalog.createTable( + envelopedEncryptionManagerId, SCHEMA, PartitionSpec.unpartitioned(), tableProperties); + + EncryptionManager createdPlainTextEncryptionManager = + createdPlainTextEncryptionManagerTable.encryption(); + EncryptionManager createdEnvelopedEncryptionManager = + createdEnvelopedEncryptionManagerTable.encryption(); + + Assertions.assertThat(createdPlainTextEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + + // When catalog prop sets encryption.ignore.table.properties to true, encryption config in + // TableProperties doesn't take effect + Assertions.assertThat(createdEnvelopedEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + + // loaded table encryptionManager + Table plainTextEncryptionManagerTable = catalog.loadTable(plainTextEncryptionManagerId); + Table envelopedEncryptionManagerTable = catalog.loadTable(envelopedEncryptionManagerId); + + EncryptionManager plainTextEncryptionManager = plainTextEncryptionManagerTable.encryption(); + EncryptionManager envelopedEncryptionManager = envelopedEncryptionManagerTable.encryption(); + + Assertions.assertThat(plainTextEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + Assertions.assertThat(envelopedEncryptionManager) + .isInstanceOf(PlaintextEncryptionManager.class); + } + + @Test + public void testCatalogLevelEncryptionManagerWithKeySetting() throws IOException { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + + // KMS impl and key provided by catalog Prop + Map catalogProperties = + ImmutableMap.of( + CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL, + UnitestKMS.class.getCanonicalName(), + TableProperties.ENCRYPTION_TABLE_KEY, + UnitestKMS.MASTER_KEY_NAME1); + catalog.initialize("prod", catalogProperties); + + Namespace namespace = Namespace.of("encryptionManagerFact"); + TableIdentifier notSetTableEncryptionKey = + TableIdentifier.of(namespace, "NotSetTableEncryptionKey"); + + TableIdentifier setTableEncryptionKey = TableIdentifier.of(namespace, "SetTableEncryptionKey"); + + if (requiresNamespaceCreate()) { + catalog.createNamespace(namespace); + } + + Table noTableKMSKeySetTable = catalog.createTable(notSetTableEncryptionKey, SCHEMA); + + // KMS key config provided to overwrite catalog kms key setting + Map tableProperties = Maps.newHashMap(); + tableProperties.put( + TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, UnitestKMS.class.getCanonicalName()); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME2); + tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + Table tableKMSKeySetTable = + catalog.createTable( + setTableEncryptionKey, SCHEMA, PartitionSpec.unpartitioned(), tableProperties); + + EncryptionManager noTableKMSKeySetTableEncryptionManager = noTableKMSKeySetTable.encryption(); + EncryptionManager tableKMSKeySetTableEncryptionManager = tableKMSKeySetTable.encryption(); + + Assertions.assertThat(noTableKMSKeySetTableEncryptionManager) + .isInstanceOf(EnvelopeEncryptionManager.class); + + Assertions.assertThat(tableKMSKeySetTableEncryptionManager) + .isInstanceOf(EnvelopeEncryptionManager.class); + + Path noTableKeySetManagerPath = Paths.get("/tmp/" + UUID.randomUUID() + ".parquet"); + File noTableKeySetManagerFile = createTempFile(noTableKeySetManagerPath); + + EncryptedLocalOutputFile encryptedLocalOutputFileForNoTableKeySet = + new EncryptedLocalOutputFile(noTableKeySetManagerFile); + + EncryptedOutputFile catalogKeyEncrypted = + noTableKMSKeySetTableEncryptionManager.encrypt(encryptedLocalOutputFileForNoTableKeySet); + ByteBuffer serialized = + EnvelopeMetadataParser.toJson((EnvelopeMetadata) catalogKeyEncrypted.keyMetadata()); + EnvelopeMetadata parsedMetadata = EnvelopeMetadataParser.fromJson(serialized); + + Assertions.assertThat(parsedMetadata.kekId()).isEqualTo(UnitestKMS.MASTER_KEY_NAME1); + + Path tableKeySetManagerPath = Paths.get("/tmp/" + UUID.randomUUID() + ".parquet"); + File tableKeySetManagerFile = createTempFile(tableKeySetManagerPath); + + EncryptedLocalOutputFile encryptedLocalOutputFileForTableKeySet = + new EncryptedLocalOutputFile(tableKeySetManagerFile); + + EncryptedOutputFile tableKeyEncrypted = + tableKMSKeySetTableEncryptionManager.encrypt(encryptedLocalOutputFileForTableKeySet); + ByteBuffer serialized2 = + EnvelopeMetadataParser.toJson((EnvelopeMetadata) tableKeyEncrypted.keyMetadata()); + EnvelopeMetadata parsedMetadataTk = EnvelopeMetadataParser.fromJson(serialized2); + + // When KMS key is set on Catalog side, it will take precedence. + Assertions.assertThat(parsedMetadataTk.kekId()).isEqualTo(UnitestKMS.MASTER_KEY_NAME1); + + // loaded table encryptionManager + Table loadedNoSetKMSKeyTable = catalog.loadTable(notSetTableEncryptionKey); + Table loadedSetKMSKeyTable = catalog.loadTable(setTableEncryptionKey); + + EncryptionManager loadedNoSetKMSKeyTableEncryptionManager = loadedNoSetKMSKeyTable.encryption(); + EncryptionManager loadedSetKMSKeyTableEncryptionManager = loadedSetKMSKeyTable.encryption(); + + Assertions.assertThat(loadedNoSetKMSKeyTableEncryptionManager) + .isInstanceOf(EnvelopeEncryptionManager.class); + Assertions.assertThat(loadedSetKMSKeyTableEncryptionManager) + .isInstanceOf(EnvelopeEncryptionManager.class); + } + @Test public void diffAgainstSingleTable() { Namespace namespace = Namespace.of("namespace"); @@ -2546,6 +2815,12 @@ public void testNoCleanupForNonCleanableReplaceTransaction() { .isTrue(); } + static File createTempFile(Path temp) throws IOException { + File tmpFolder = temp.resolve("parquet").toFile(); + String filename = UUID.randomUUID().toString(); + return new File(tmpFolder, FileFormat.PARQUET.addExtension(filename)); + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTTableOperations.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTTableOperations.java new file mode 100644 index 000000000000..0e853f12b0ee --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTTableOperations.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.DefaultEncryptionManagerFactory; +import org.apache.iceberg.encryption.EncryptionManagerFactory; +import org.apache.iceberg.encryption.EnvelopeEncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.kms.UnitestKMS; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; + +public class TestRESTTableOperations { + + private static final int PORT = 1080; + private static final String URI = String.format("http://127.0.0.1:%d", PORT); + private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO"; + private static final Schema TABLE_SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID 🤪"), + required(2, "data", Types.StringType.get())); + + private static final Map catalogProperties = + ImmutableMap.of(CatalogProperties.ENCRYPTION_IGNORE_TABLE_PROPS, "false"); + private static ClientAndServer mockServer; + private static RESTClient restClient; + private static FileIO fileIO; + private static EncryptionManagerFactory encryptionManagerFactory; + + @BeforeAll + public static void beforeClass() throws Exception { + Configuration conf = new Configuration(); + encryptionManagerFactory = new DefaultEncryptionManagerFactory(); + encryptionManagerFactory.initialize(catalogProperties); + mockServer = startClientAndServer(PORT); + restClient = HTTPClient.builder(ImmutableMap.of()).uri(URI).build(); + fileIO = CatalogUtil.loadFileIO(DEFAULT_FILE_IO_IMPL, catalogProperties, conf); + } + + @AfterAll + public static void stopServer() throws IOException { + mockServer.stop(); + restClient.close(); + fileIO.close(); + encryptionManagerFactory.close(); + } + + @Test + public void testCSESupport() { + Namespace namespace = Namespace.of("encryptionManagerFact"); + TableIdentifier plainTextEncryptionManagerId = + TableIdentifier.of(namespace, "PlaintextEncryptionManager"); + TableIdentifier envelopedEncryptionManagerId = + TableIdentifier.of(namespace, "EnvelopedEncryptionManager"); + ResourcePaths resourcePaths = ResourcePaths.forCatalogProperties(catalogProperties); + + String noCSESupportTable = resourcePaths.table(plainTextEncryptionManagerId); + String cseSupportTable = resourcePaths.table(envelopedEncryptionManagerId); + + TableMetadata tableMetadata = + TableMetadata.newTableMetadata( + TABLE_SCHEMA, + PartitionSpec.unpartitioned(), + "file://tmp/db/table", + ImmutableMap.of( + TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, + UnitestKMS.class.getCanonicalName(), + TableProperties.ENCRYPTION_TABLE_KEY, + UnitestKMS.MASTER_KEY_NAME1, + TableProperties.FORMAT_VERSION, + "2")); + + RESTTableOperations restTableOperationsNoCSESupport = + new RESTTableOperations( + restClient, noCSESupportTable, null, fileIO, encryptionManagerFactory, null); + RESTTableOperations restTableOperationsWithCSESupport = + new RESTTableOperations( + restClient, cseSupportTable, null, fileIO, encryptionManagerFactory, tableMetadata); + Assertions.assertThat(restTableOperationsNoCSESupport.encryption()) + .isInstanceOf(PlaintextEncryptionManager.class); + Assertions.assertThat(restTableOperationsWithCSESupport.encryption()) + .isInstanceOf(EnvelopeEncryptionManager.class); + } +}