diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 6b196f5ea0..2acd615048 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -19,7 +19,9 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedStage; +import static java.util.stream.Collectors.toCollection; import static java.util.stream.Collectors.toList; +import static org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata.STAGED_PROPERTY; import static org.projectnessie.catalog.formats.iceberg.nessie.IcebergConstants.NESSIE_COMMIT_ID; import static org.projectnessie.catalog.formats.iceberg.nessie.IcebergConstants.NESSIE_COMMIT_REF; import static org.projectnessie.catalog.formats.iceberg.nessie.IcebergConstants.NESSIE_CONTENT_ID; @@ -42,6 +44,7 @@ import static org.projectnessie.model.Content.Type.NAMESPACE; import static org.projectnessie.versioned.RequestMeta.API_READ; +import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nullable; import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; @@ -775,44 +778,76 @@ private List pruneUpdates( if (update) { return op.updates(); } - List prunedUpdates = new ArrayList<>(op.updates()); - String location = op.getSingleUpdate(SetLocation.class, SetLocation::location).orElse(null); - if (op.hasUpdate(SetProperties.class)) { - Map properties = - op.getSingleUpdateValue(SetProperties.class, SetProperties::updates); - if (properties.containsKey(IcebergTableMetadata.STAGED_PROPERTY)) { - prunedUpdates.removeIf(u -> u instanceof SetProperties); - properties = new HashMap<>(properties); - properties.remove(IcebergTableMetadata.STAGED_PROPERTY); - prunedUpdates.add(setProperties(properties)); - } - } - if (location == null) { - WarehouseConfig w = lakehouseConfig.catalog().getWarehouse(op.warehouse()); - location = - icebergNewEntityBaseLocation( - locationForEntity( - w, - op.getKey(), - op.getType(), - apiContext, - reference.getName(), - reference.getHash()) - .toString()); - } else { - validateStorageLocation(location) - .ifPresent( - msg -> { - throw new IllegalArgumentException( - format( - "Location for %s '%s' cannot be associated with any configured object storage location: %s", - op.getType().name(), op.getKey(), msg)); - }); - } + + var prunedUpdates = pruneCreateUpdates(op.updates()); + + var location = setLocationForCreate(reference, op, apiContext); prunedUpdates.add(setTrustedLocation(location)); + return prunedUpdates; } + @VisibleForTesting + String setLocationForCreate(Branch reference, IcebergCatalogOperation op, ApiContext apiContext) { + return op.updates().stream() + .filter(SetLocation.class::isInstance) + .map(SetLocation.class::cast) + .map(SetLocation::location) + // Consider the _last_ SetLocation update, if there are multiple (not meaningful) + .reduce((a, b) -> b) + .map( + l -> { + // Validate externally provided storage locations + validateStorageLocation(l) + .ifPresent( + msg -> { + throw new IllegalArgumentException( + format( + "Location for %s '%s' cannot be associated with any configured object storage location: %s", + op.getType().name(), op.getKey(), msg)); + }); + return l; + }) + .orElseGet( + () -> { + // Apply the computed & trusted storage location + WarehouseConfig w = lakehouseConfig.catalog().getWarehouse(op.warehouse()); + return icebergNewEntityBaseLocation( + locationForEntity( + w, + op.getKey(), + op.getType(), + apiContext, + reference.getName(), + reference.getHash()) + .toString()); + }); + } + + @VisibleForTesting + static List pruneCreateUpdates(List updates) { + // Iterate over all updates and tweak the 'SetProperties' updates to remove the + // IcebergTableMetadata.STAGED_PROPERTY property, if present. + return updates.stream() + .map( + up -> { + if (up instanceof SetProperties) { + var properties = ((SetProperties) up).updates(); + if (properties.containsKey(STAGED_PROPERTY)) { + properties = new HashMap<>(properties); + properties.remove(STAGED_PROPERTY); + if (properties.isEmpty()) { + return null; + } + return setProperties(properties); + } + } + return up; + }) + .filter(Objects::nonNull) + .collect(toCollection(ArrayList::new)); + } + private CompletionStage loadExistingTableSnapshot(Content content) { ObjId snapshotId = snapshotObjIdForContent(content); return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java index f823b91a52..7f0a278d0b 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestCatalogServiceImpl.java @@ -18,8 +18,12 @@ import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MINUTES; import static org.assertj.core.api.InstanceOfAssertFactories.STRING; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.api.v2.params.ParsedReference.parsedReference; +import static org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata.STAGED_PROPERTY; import static org.projectnessie.catalog.formats.iceberg.nessie.CatalogOps.CATALOG_UPDATE_MULTIPLE; +import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetDefaultSortOrder.setDefaultSortOrder; +import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetProperties.setProperties; import static org.projectnessie.catalog.service.api.SnapshotReqParams.forSnapshotHttpReq; import static org.projectnessie.model.CommitMeta.fromMessage; import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; @@ -40,14 +44,20 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.assertj.core.api.AbstractThrowableAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.projectnessie.api.v2.params.ParsedReference; import org.projectnessie.catalog.formats.iceberg.meta.IcebergJson; import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata; import org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate; +import org.projectnessie.catalog.formats.iceberg.rest.ImmutableSetLocation; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; import org.projectnessie.catalog.service.api.CatalogCommit; import org.projectnessie.catalog.service.api.CatalogService; @@ -107,6 +117,87 @@ public void cleanupAfterNessieCommitFailure() throws Exception { soft.assertThat(heapStorageBucket.objects()).isEmpty(); } + @Test + public void setLocationForCreate() throws Exception { + var main = (Branch) api.getReference().refName("main").get(); + var apiContext = apiContext("Catalog", 0); + soft.assertThatCode( + () -> + catalogService.setLocationForCreate( + main, + IcebergCatalogOperation.builder() + .key(ContentKey.of("my", "table")) + .type(ICEBERG_TABLE) + .build(), + apiContext)) + .doesNotThrowAnyException(); + + soft.assertThatCode( + () -> + catalogService.setLocationForCreate( + main, + IcebergCatalogOperation.builder() + .key(ContentKey.of("my", "table")) + .type(ICEBERG_TABLE) + .addUpdate(ImmutableSetLocation.of("s3://foo/bar", false)) + .build(), + apiContext)) + .doesNotThrowAnyException(); + + soft.assertThatCode( + () -> + catalogService.setLocationForCreate( + main, + IcebergCatalogOperation.builder() + .key(ContentKey.of("my", "table")) + .type(ICEBERG_TABLE) + .addUpdate( + ImmutableSetLocation.of("s3://" + BUCKET + "/foo/bar/baz/", false)) + .build(), + apiContext)) + .doesNotThrowAnyException(); + + soft.assertThatIllegalArgumentException() + .isThrownBy( + () -> + catalogService.setLocationForCreate( + main, + IcebergCatalogOperation.builder() + .key(ContentKey.of("my", "table")) + .type(ICEBERG_TABLE) + .addUpdate( + ImmutableSetLocation.of("meep://" + BUCKET + "/foo/bar/baz/", false)) + .build(), + apiContext)) + .withMessage( + "Location for ICEBERG_TABLE 'my.table' cannot be associated with any configured object storage location: Not an S3 URI"); + } + + @ParameterizedTest + @MethodSource + public void pruneCreateUpdates( + List updates, List expected) { + soft.assertThat(CatalogServiceImpl.pruneCreateUpdates(updates)).isEqualTo(expected); + } + + static Stream pruneCreateUpdates() { + return Stream.of( + arguments(List.of(), List.of()), + arguments( + List.of(setDefaultSortOrder(42), setProperties(Map.of("foo", "bar"))), + List.of(setDefaultSortOrder(42), setProperties(Map.of("foo", "bar")))), + arguments( + List.of(setProperties(Map.of("foo", "bar", STAGED_PROPERTY, "true"))), + List.of(setProperties(Map.of("foo", "bar")))), + arguments(List.of(setProperties(Map.of(STAGED_PROPERTY, "true"))), List.of()), + arguments( + List.of( + setProperties(Map.of("foo", "bar")), + setProperties(Map.of(STAGED_PROPERTY, "true")), + setDefaultSortOrder(42)), + List.of(setProperties(Map.of("foo", "bar")), setDefaultSortOrder(42)))); + } + @ParameterizedTest @ValueSource(ints = {0, 1, 2}) public void cleanupAfterObjectIoFailure(int after) throws Exception {