Skip to content

Commit

Permalink
Catalog: Allow multiple SetProperties updates (#10024)
Browse files Browse the repository at this point in the history
Fixes #9999
  • Loading branch information
snazy authored Dec 2, 2024
1 parent bbd5050 commit f4f0ec5
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedStage;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;
import static org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata.STAGED_PROPERTY;
import static org.projectnessie.catalog.formats.iceberg.nessie.IcebergConstants.NESSIE_COMMIT_ID;
import static org.projectnessie.catalog.formats.iceberg.nessie.IcebergConstants.NESSIE_COMMIT_REF;
import static org.projectnessie.catalog.formats.iceberg.nessie.IcebergConstants.NESSIE_CONTENT_ID;
Expand All @@ -42,6 +44,7 @@
import static org.projectnessie.model.Content.Type.NAMESPACE;
import static org.projectnessie.versioned.RequestMeta.API_READ;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -775,44 +778,76 @@ private List<IcebergMetadataUpdate> pruneUpdates(
if (update) {
return op.updates();
}
List<IcebergMetadataUpdate> prunedUpdates = new ArrayList<>(op.updates());
String location = op.getSingleUpdate(SetLocation.class, SetLocation::location).orElse(null);
if (op.hasUpdate(SetProperties.class)) {
Map<String, String> properties =
op.getSingleUpdateValue(SetProperties.class, SetProperties::updates);
if (properties.containsKey(IcebergTableMetadata.STAGED_PROPERTY)) {
prunedUpdates.removeIf(u -> u instanceof SetProperties);
properties = new HashMap<>(properties);
properties.remove(IcebergTableMetadata.STAGED_PROPERTY);
prunedUpdates.add(setProperties(properties));
}
}
if (location == null) {
WarehouseConfig w = lakehouseConfig.catalog().getWarehouse(op.warehouse());
location =
icebergNewEntityBaseLocation(
locationForEntity(
w,
op.getKey(),
op.getType(),
apiContext,
reference.getName(),
reference.getHash())
.toString());
} else {
validateStorageLocation(location)
.ifPresent(
msg -> {
throw new IllegalArgumentException(
format(
"Location for %s '%s' cannot be associated with any configured object storage location: %s",
op.getType().name(), op.getKey(), msg));
});
}

var prunedUpdates = pruneCreateUpdates(op.updates());

var location = setLocationForCreate(reference, op, apiContext);
prunedUpdates.add(setTrustedLocation(location));

return prunedUpdates;
}

@VisibleForTesting
String setLocationForCreate(Branch reference, IcebergCatalogOperation op, ApiContext apiContext) {
return op.updates().stream()
.filter(SetLocation.class::isInstance)
.map(SetLocation.class::cast)
.map(SetLocation::location)
// Consider the _last_ SetLocation update, if there are multiple (not meaningful)
.reduce((a, b) -> b)
.map(
l -> {
// Validate externally provided storage locations
validateStorageLocation(l)
.ifPresent(
msg -> {
throw new IllegalArgumentException(
format(
"Location for %s '%s' cannot be associated with any configured object storage location: %s",
op.getType().name(), op.getKey(), msg));
});
return l;
})
.orElseGet(
() -> {
// Apply the computed & trusted storage location
WarehouseConfig w = lakehouseConfig.catalog().getWarehouse(op.warehouse());
return icebergNewEntityBaseLocation(
locationForEntity(
w,
op.getKey(),
op.getType(),
apiContext,
reference.getName(),
reference.getHash())
.toString());
});
}

@VisibleForTesting
static List<IcebergMetadataUpdate> pruneCreateUpdates(List<IcebergMetadataUpdate> updates) {
// Iterate over all updates and tweak the 'SetProperties' updates to remove the
// IcebergTableMetadata.STAGED_PROPERTY property, if present.
return updates.stream()
.map(
up -> {
if (up instanceof SetProperties) {
var properties = ((SetProperties) up).updates();
if (properties.containsKey(STAGED_PROPERTY)) {
properties = new HashMap<>(properties);
properties.remove(STAGED_PROPERTY);
if (properties.isEmpty()) {
return null;
}
return setProperties(properties);
}
}
return up;
})
.filter(Objects::nonNull)
.collect(toCollection(ArrayList::new));
}

private CompletionStage<NessieTableSnapshot> loadExistingTableSnapshot(Content content) {
ObjId snapshotId = snapshotObjIdForContent(content);
return icebergStuff().retrieveIcebergSnapshot(snapshotId, content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.InstanceOfAssertFactories.STRING;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.projectnessie.api.v2.params.ParsedReference.parsedReference;
import static org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata.STAGED_PROPERTY;
import static org.projectnessie.catalog.formats.iceberg.nessie.CatalogOps.CATALOG_UPDATE_MULTIPLE;
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetDefaultSortOrder.setDefaultSortOrder;
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate.SetProperties.setProperties;
import static org.projectnessie.catalog.service.api.SnapshotReqParams.forSnapshotHttpReq;
import static org.projectnessie.model.CommitMeta.fromMessage;
import static org.projectnessie.model.Content.Type.ICEBERG_TABLE;
Expand All @@ -40,14 +44,20 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.api.v2.params.ParsedReference;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergJson;
import org.projectnessie.catalog.formats.iceberg.meta.IcebergTableMetadata;
import org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergMetadataUpdate;
import org.projectnessie.catalog.formats.iceberg.rest.ImmutableSetLocation;
import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot;
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.CatalogService;
Expand Down Expand Up @@ -107,6 +117,87 @@ public void cleanupAfterNessieCommitFailure() throws Exception {
soft.assertThat(heapStorageBucket.objects()).isEmpty();
}

@Test
public void setLocationForCreate() throws Exception {
var main = (Branch) api.getReference().refName("main").get();
var apiContext = apiContext("Catalog", 0);
soft.assertThatCode(
() ->
catalogService.setLocationForCreate(
main,
IcebergCatalogOperation.builder()
.key(ContentKey.of("my", "table"))
.type(ICEBERG_TABLE)
.build(),
apiContext))
.doesNotThrowAnyException();

soft.assertThatCode(
() ->
catalogService.setLocationForCreate(
main,
IcebergCatalogOperation.builder()
.key(ContentKey.of("my", "table"))
.type(ICEBERG_TABLE)
.addUpdate(ImmutableSetLocation.of("s3://foo/bar", false))
.build(),
apiContext))
.doesNotThrowAnyException();

soft.assertThatCode(
() ->
catalogService.setLocationForCreate(
main,
IcebergCatalogOperation.builder()
.key(ContentKey.of("my", "table"))
.type(ICEBERG_TABLE)
.addUpdate(
ImmutableSetLocation.of("s3://" + BUCKET + "/foo/bar/baz/", false))
.build(),
apiContext))
.doesNotThrowAnyException();

soft.assertThatIllegalArgumentException()
.isThrownBy(
() ->
catalogService.setLocationForCreate(
main,
IcebergCatalogOperation.builder()
.key(ContentKey.of("my", "table"))
.type(ICEBERG_TABLE)
.addUpdate(
ImmutableSetLocation.of("meep://" + BUCKET + "/foo/bar/baz/", false))
.build(),
apiContext))
.withMessage(
"Location for ICEBERG_TABLE 'my.table' cannot be associated with any configured object storage location: Not an S3 URI");
}

@ParameterizedTest
@MethodSource
public void pruneCreateUpdates(
List<IcebergMetadataUpdate> updates, List<IcebergMetadataUpdate> expected) {
soft.assertThat(CatalogServiceImpl.pruneCreateUpdates(updates)).isEqualTo(expected);
}

static Stream<Arguments> pruneCreateUpdates() {
return Stream.of(
arguments(List.of(), List.of()),
arguments(
List.of(setDefaultSortOrder(42), setProperties(Map.of("foo", "bar"))),
List.of(setDefaultSortOrder(42), setProperties(Map.of("foo", "bar")))),
arguments(
List.of(setProperties(Map.of("foo", "bar", STAGED_PROPERTY, "true"))),
List.of(setProperties(Map.of("foo", "bar")))),
arguments(List.of(setProperties(Map.of(STAGED_PROPERTY, "true"))), List.of()),
arguments(
List.of(
setProperties(Map.of("foo", "bar")),
setProperties(Map.of(STAGED_PROPERTY, "true")),
setDefaultSortOrder(42)),
List.of(setProperties(Map.of("foo", "bar")), setDefaultSortOrder(42))));
}

@ParameterizedTest
@ValueSource(ints = {0, 1, 2})
public void cleanupAfterObjectIoFailure(int after) throws Exception {
Expand Down

0 comments on commit f4f0ec5

Please sign in to comment.