From 0fbb04f346e1a55911e888143ceb9dd60e6042b3 Mon Sep 17 00:00:00 2001 From: Alexandre Dutra Date: Thu, 30 May 2024 17:07:54 +0200 Subject: [PATCH] refactoring + forbid metadata json --- .../iceberg/rest/IcebergException.java | 29 +++ .../rest/IcebergApiV1GenericResource.java | 221 +++++++++++------- .../service/rest/IcebergErrorMapper.java | 3 + 3 files changed, 166 insertions(+), 87 deletions(-) create mode 100644 catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergException.java diff --git a/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergException.java b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergException.java new file mode 100644 index 00000000000..6b495e8c9b5 --- /dev/null +++ b/catalog/format/iceberg/src/main/java/org/projectnessie/catalog/formats/iceberg/rest/IcebergException.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.formats.iceberg.rest; + +public class IcebergException extends RuntimeException { + + private final IcebergError error; + + public IcebergException(IcebergError error) { + this.error = error; + } + + public IcebergErrorResponse toErrorResponse() { + return IcebergErrorResponse.icebergErrorResponse(error); + } +} diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java index 09ccb088c32..2d9a610ff9a 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergApiV1GenericResource.java @@ -18,7 +18,6 @@ import static java.util.Objects.requireNonNull; import static org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg.icebergBasePrefix; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergError.icebergError; -import static org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse.icebergErrorResponse; import static org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignResponse.icebergS3SignResponse; import static org.projectnessie.model.Content.Type.ICEBERG_TABLE; @@ -55,7 +54,10 @@ import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation; import org.projectnessie.catalog.formats.iceberg.rest.IcebergCommitTransactionRequest; import org.projectnessie.catalog.formats.iceberg.rest.IcebergConfigResponse; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergException; import org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignRequest; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignResponse; +import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; import org.projectnessie.catalog.service.api.CatalogCommit; import org.projectnessie.catalog.service.api.SnapshotReqParams; import org.projectnessie.catalog.service.api.SnapshotResponse; @@ -63,8 +65,11 @@ import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind; import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; import org.projectnessie.model.ContentResponse; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.IcebergView; /** * Handles Iceberg REST API v1 endpoints that are not strongly associated with a particular entity @@ -112,95 +117,12 @@ public IcebergConfigResponse getConfig( @POST @Path("/v1/{prefix}/s3-sign/{identifier}") @Blocking - public Uni s3sign( + public Uni s3sign( IcebergS3SignRequest request, @PathParam("prefix") String prefix, - @PathParam("identifier") String identifier) - throws NessieNotFoundException { + @PathParam("identifier") String identifier) { - DecodedPrefix decodedPrefix = decodePrefix(prefix); - ContentKey key = ContentKey.fromPathString(identifier); - String requestedS3Uri = S3Utils.asS3Location(request.uri()); - - return collectLocations(decodedPrefix, key) - .filter(requestedS3Uri::startsWith) - .toUni() - .onItem() - .ifNotNull() - .transform(ignored -> doRequestSign(request)) - .replaceIfNullWith(() -> signingRequestDenied(request)); - } - - private Multi collectLocations(DecodedPrefix decodedPrefix, ContentKey key) - throws NessieNotFoundException { - - WarehouseConfig warehouse = catalogConfig.getWarehouse(decodedPrefix.warehouse()); - String basePrefix = icebergBasePrefix(warehouse.location(), key); - - try { - ParsedReference ref = decodedPrefix.parsedReference(); - ContentResponse contentResponse = - nessieApi - .getContent() - .refName(ref.name()) - .hashOnRef(ref.hashWithRelativeSpec()) - .getSingle(key); - - // table exists: fetch snapshot, get additional locations - CompletionStage stage = - catalogService.retrieveSnapshot( - SnapshotReqParams.forSnapshotHttpReq(ref, "iceberg", null), - key, - contentResponse.getContent().getType()); - return Uni.createFrom() - .completionStage(stage) - .map(SnapshotResponse::nessieSnapshot) - .onItem() - .transformToMulti( - snapshot -> - Multi.createFrom() - .emitter( - e -> { - e.emit(snapshot.icebergLocation()); - snapshot.additionalKnownLocations().forEach(e::emit); - e.complete(); - })) - // if location is within the base prefix, use the base prefix; - // otherwise, use the location as-is. This allows for temporary - // locations to be used for signing, as long as they are not - // outside the base prefix. - .map(location -> location.startsWith(basePrefix) ? basePrefix : location); - } catch (NessieContentNotFoundException e) { - - // table does not exist: allow any path under the base prefix - return Multi.createFrom().item(basePrefix); - } - } - - private Response doRequestSign(IcebergS3SignRequest request) { - URI uri = URI.create(request.uri()); - Optional bucket = s3options.extractBucket(uri); - Optional body = Optional.ofNullable(request.body()); - - SigningRequest signingRequest = - SigningRequest.signingRequest( - uri, request.method(), request.region(), bucket, body, request.headers()); - - SigningResponse signed = signer.sign(signingRequest); - - return Response.ok(icebergS3SignResponse(signed.uri().toString(), signed.headers())).build(); - } - - private static Response signingRequestDenied(IcebergS3SignRequest request) { - return Response.status(Status.FORBIDDEN) - .entity( - icebergErrorResponse( - icebergError( - Status.FORBIDDEN.getStatusCode(), - "NotAuthorizedException", - "URI not allowed for signing: " + request.uri(), - List.of()))) - .build(); + return new IcebergRequestSigner(request, prefix, identifier).verifyAndSign(); } @POST @@ -257,4 +179,129 @@ public Uni commitTransaction( .completionStage(catalogService.commit(ref, commit.build(), reqParams)) .map(stream -> stream.reduce(null, (ident, snap) -> ident, (i1, i2) -> i1)); } + + private class IcebergRequestSigner { + + private final ParsedReference ref; + private final ContentKey key; + private final String basePrefix; + private final String requestedS3Uri; + private final IcebergS3SignRequest request; + + public IcebergRequestSigner(IcebergS3SignRequest request, String prefix, String identifier) { + this.request = request; + DecodedPrefix decodedPrefix = decodePrefix(prefix); + ref = decodedPrefix.parsedReference(); + key = ContentKey.fromPathString(identifier); + WarehouseConfig warehouse = catalogConfig.getWarehouse(decodedPrefix.warehouse()); + basePrefix = icebergBasePrefix(warehouse.location(), key); + requestedS3Uri = S3Utils.asS3Location(request.uri()); + } + + Uni verifyAndSign() { + return fetchContent() + .call(this::checkForbiddenLocations) + .flatMap(this::fetchSnapshot) + .onItem() + .transformToMulti(this::collectLocations) + .filter(requestedS3Uri::startsWith) + .toUni() + .onItem() + .ifNull() + .failWith(this::unauthorized) + .replaceWith(request.uri()) + .map(this::sign); + } + + private Uni fetchContent() { + try { + ContentResponse contentResponse = + nessieApi + .getContent() + .refName(ref.name()) + .hashOnRef(ref.hashWithRelativeSpec()) + .getSingle(key); + return Uni.createFrom().item(contentResponse.getContent()); + } catch (NessieContentNotFoundException e) { + return Uni.createFrom().nullItem(); + } catch (NessieNotFoundException e) { + return Uni.createFrom().failure(e); + } + } + + private Uni checkForbiddenLocations(Content content) { + String metadataLocation = null; + if (content instanceof IcebergTable) { + metadataLocation = ((IcebergTable) content).getMetadataLocation(); + } else if (content instanceof IcebergView) { + metadataLocation = ((IcebergView) content).getMetadataLocation(); + } + if (metadataLocation == null || !request.uri().equals(metadataLocation)) { + return Uni.createFrom().nullItem(); + } else { + return Uni.createFrom().failure(unauthorized()); + } + } + + private Uni> fetchSnapshot(Content content) { + if (content != null) { + try { + CompletionStage stage = + catalogService.retrieveSnapshot( + SnapshotReqParams.forSnapshotHttpReq(ref, "iceberg", null), + key, + content.getType()); + return Uni.createFrom().completionStage(stage).map(SnapshotResponse::nessieSnapshot); + } catch (NessieContentNotFoundException ignored) { + } catch (NessieNotFoundException e) { + return Uni.createFrom().failure(e); + } + } + return Uni.createFrom().nullItem(); + } + + private Multi collectLocations(NessieEntitySnapshot snapshot) { + if (snapshot != null) { + // table exists: collect all locations + return Multi.createFrom() + .emitter( + e -> { + e.emit(snapshot.icebergLocation()); + snapshot.additionalKnownLocations().forEach(e::emit); + e.complete(); + }) + // if location is within the base prefix, use the base prefix; + // otherwise, use the location as-is. This allows for temporary + // locations to be used for signing, as long as they are not + // outside the base prefix. + .map(location -> location.startsWith(basePrefix) ? basePrefix : location); + } else { + // table does not exist: allow any path under the base prefix + return Multi.createFrom().item(basePrefix); + } + } + + private IcebergS3SignResponse sign(String uriToSign) { + URI uri = URI.create(uriToSign); + Optional bucket = s3options.extractBucket(uri); + Optional body = Optional.ofNullable(request.body()); + + SigningRequest signingRequest = + SigningRequest.signingRequest( + uri, request.method(), request.region(), bucket, body, request.headers()); + + SigningResponse signed = signer.sign(signingRequest); + + return icebergS3SignResponse(signed.uri().toString(), signed.headers()); + } + + private IcebergException unauthorized() { + return new IcebergException( + icebergError( + Status.FORBIDDEN.getStatusCode(), + "NotAuthorizedException", + "URI not allowed for signing: " + request.uri(), + List.of())); + } + } } diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java index a551448145e..51140966f13 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse; +import org.projectnessie.catalog.formats.iceberg.rest.IcebergException; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; import org.projectnessie.error.BaseNessieClientServerException; import org.projectnessie.error.ContentKeyErrorDetails; @@ -75,6 +76,8 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) { body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind); } else if (ex instanceof IllegalArgumentException) { body = errorResponse(400, "IllegalArgumentException", ex.getMessage(), ex); + } else if (ex instanceof IcebergException) { + body = ((IcebergException) ex).toErrorResponse(); } if (body == null) {