Skip to content

Commit

Permalink
refactoring + forbid metadata json
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra committed May 30, 2024
1 parent 6e8930d commit 0fbb04f
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.formats.iceberg.rest;

public class IcebergException extends RuntimeException {

private final IcebergError error;

public IcebergException(IcebergError error) {
this.error = error;
}

public IcebergErrorResponse toErrorResponse() {
return IcebergErrorResponse.icebergErrorResponse(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static java.util.Objects.requireNonNull;
import static org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg.icebergBasePrefix;
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergError.icebergError;
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse.icebergErrorResponse;
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignResponse.icebergS3SignResponse;
import static org.projectnessie.model.Content.Type.ICEBERG_TABLE;

Expand Down Expand Up @@ -55,16 +54,22 @@
import org.projectnessie.catalog.formats.iceberg.rest.IcebergCatalogOperation;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergCommitTransactionRequest;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergConfigResponse;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergException;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignRequest;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergS3SignResponse;
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.service.api.CatalogCommit;
import org.projectnessie.catalog.service.api.SnapshotReqParams;
import org.projectnessie.catalog.service.api.SnapshotResponse;
import org.projectnessie.catalog.service.config.WarehouseConfig;
import org.projectnessie.catalog.service.rest.IcebergErrorMapper.IcebergEntityKind;
import org.projectnessie.error.NessieContentNotFoundException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.ContentResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;

/**
* Handles Iceberg REST API v1 endpoints that are not strongly associated with a particular entity
Expand Down Expand Up @@ -112,95 +117,12 @@ public IcebergConfigResponse getConfig(
@POST
@Path("/v1/{prefix}/s3-sign/{identifier}")
@Blocking
public Uni<Response> s3sign(
public Uni<IcebergS3SignResponse> s3sign(
IcebergS3SignRequest request,
@PathParam("prefix") String prefix,
@PathParam("identifier") String identifier)
throws NessieNotFoundException {
@PathParam("identifier") String identifier) {

DecodedPrefix decodedPrefix = decodePrefix(prefix);
ContentKey key = ContentKey.fromPathString(identifier);
String requestedS3Uri = S3Utils.asS3Location(request.uri());

return collectLocations(decodedPrefix, key)
.filter(requestedS3Uri::startsWith)
.toUni()
.onItem()
.ifNotNull()
.transform(ignored -> doRequestSign(request))
.replaceIfNullWith(() -> signingRequestDenied(request));
}

private Multi<String> collectLocations(DecodedPrefix decodedPrefix, ContentKey key)
throws NessieNotFoundException {

WarehouseConfig warehouse = catalogConfig.getWarehouse(decodedPrefix.warehouse());
String basePrefix = icebergBasePrefix(warehouse.location(), key);

try {
ParsedReference ref = decodedPrefix.parsedReference();
ContentResponse contentResponse =
nessieApi
.getContent()
.refName(ref.name())
.hashOnRef(ref.hashWithRelativeSpec())
.getSingle(key);

// table exists: fetch snapshot, get additional locations
CompletionStage<SnapshotResponse> stage =
catalogService.retrieveSnapshot(
SnapshotReqParams.forSnapshotHttpReq(ref, "iceberg", null),
key,
contentResponse.getContent().getType());
return Uni.createFrom()
.completionStage(stage)
.map(SnapshotResponse::nessieSnapshot)
.onItem()
.<String>transformToMulti(
snapshot ->
Multi.createFrom()
.emitter(
e -> {
e.emit(snapshot.icebergLocation());
snapshot.additionalKnownLocations().forEach(e::emit);
e.complete();
}))
// if location is within the base prefix, use the base prefix;
// otherwise, use the location as-is. This allows for temporary
// locations to be used for signing, as long as they are not
// outside the base prefix.
.map(location -> location.startsWith(basePrefix) ? basePrefix : location);
} catch (NessieContentNotFoundException e) {

// table does not exist: allow any path under the base prefix
return Multi.createFrom().item(basePrefix);
}
}

private Response doRequestSign(IcebergS3SignRequest request) {
URI uri = URI.create(request.uri());
Optional<String> bucket = s3options.extractBucket(uri);
Optional<String> body = Optional.ofNullable(request.body());

SigningRequest signingRequest =
SigningRequest.signingRequest(
uri, request.method(), request.region(), bucket, body, request.headers());

SigningResponse signed = signer.sign(signingRequest);

return Response.ok(icebergS3SignResponse(signed.uri().toString(), signed.headers())).build();
}

private static Response signingRequestDenied(IcebergS3SignRequest request) {
return Response.status(Status.FORBIDDEN)
.entity(
icebergErrorResponse(
icebergError(
Status.FORBIDDEN.getStatusCode(),
"NotAuthorizedException",
"URI not allowed for signing: " + request.uri(),
List.of())))
.build();
return new IcebergRequestSigner(request, prefix, identifier).verifyAndSign();
}

@POST
Expand Down Expand Up @@ -257,4 +179,129 @@ public Uni<Void> commitTransaction(
.completionStage(catalogService.commit(ref, commit.build(), reqParams))
.map(stream -> stream.reduce(null, (ident, snap) -> ident, (i1, i2) -> i1));
}

private class IcebergRequestSigner {

private final ParsedReference ref;
private final ContentKey key;
private final String basePrefix;
private final String requestedS3Uri;
private final IcebergS3SignRequest request;

public IcebergRequestSigner(IcebergS3SignRequest request, String prefix, String identifier) {
this.request = request;
DecodedPrefix decodedPrefix = decodePrefix(prefix);
ref = decodedPrefix.parsedReference();
key = ContentKey.fromPathString(identifier);
WarehouseConfig warehouse = catalogConfig.getWarehouse(decodedPrefix.warehouse());
basePrefix = icebergBasePrefix(warehouse.location(), key);
requestedS3Uri = S3Utils.asS3Location(request.uri());
}

Uni<IcebergS3SignResponse> verifyAndSign() {
return fetchContent()
.call(this::checkForbiddenLocations)
.flatMap(this::fetchSnapshot)
.onItem()
.transformToMulti(this::collectLocations)
.filter(requestedS3Uri::startsWith)
.toUni()
.onItem()
.ifNull()
.failWith(this::unauthorized)
.replaceWith(request.uri())
.map(this::sign);
}

private Uni<Content> fetchContent() {
try {
ContentResponse contentResponse =
nessieApi
.getContent()
.refName(ref.name())
.hashOnRef(ref.hashWithRelativeSpec())
.getSingle(key);
return Uni.createFrom().item(contentResponse.getContent());
} catch (NessieContentNotFoundException e) {
return Uni.createFrom().nullItem();
} catch (NessieNotFoundException e) {
return Uni.createFrom().failure(e);
}
}

private Uni<?> checkForbiddenLocations(Content content) {
String metadataLocation = null;
if (content instanceof IcebergTable) {
metadataLocation = ((IcebergTable) content).getMetadataLocation();
} else if (content instanceof IcebergView) {
metadataLocation = ((IcebergView) content).getMetadataLocation();
}
if (metadataLocation == null || !request.uri().equals(metadataLocation)) {
return Uni.createFrom().nullItem();
} else {
return Uni.createFrom().failure(unauthorized());
}
}

private Uni<NessieEntitySnapshot<?>> fetchSnapshot(Content content) {
if (content != null) {
try {
CompletionStage<SnapshotResponse> stage =
catalogService.retrieveSnapshot(
SnapshotReqParams.forSnapshotHttpReq(ref, "iceberg", null),
key,
content.getType());
return Uni.createFrom().completionStage(stage).map(SnapshotResponse::nessieSnapshot);
} catch (NessieContentNotFoundException ignored) {
} catch (NessieNotFoundException e) {
return Uni.createFrom().failure(e);
}
}
return Uni.createFrom().nullItem();
}

private Multi<String> collectLocations(NessieEntitySnapshot<?> snapshot) {
if (snapshot != null) {
// table exists: collect all locations
return Multi.createFrom()
.<String>emitter(
e -> {
e.emit(snapshot.icebergLocation());
snapshot.additionalKnownLocations().forEach(e::emit);
e.complete();
})
// if location is within the base prefix, use the base prefix;
// otherwise, use the location as-is. This allows for temporary
// locations to be used for signing, as long as they are not
// outside the base prefix.
.map(location -> location.startsWith(basePrefix) ? basePrefix : location);
} else {
// table does not exist: allow any path under the base prefix
return Multi.createFrom().item(basePrefix);
}
}

private IcebergS3SignResponse sign(String uriToSign) {
URI uri = URI.create(uriToSign);
Optional<String> bucket = s3options.extractBucket(uri);
Optional<String> body = Optional.ofNullable(request.body());

SigningRequest signingRequest =
SigningRequest.signingRequest(
uri, request.method(), request.region(), bucket, body, request.headers());

SigningResponse signed = signer.sign(signingRequest);

return icebergS3SignResponse(signed.uri().toString(), signed.headers());
}

private IcebergException unauthorized() {
return new IcebergException(
icebergError(
Status.FORBIDDEN.getStatusCode(),
"NotAuthorizedException",
"URI not allowed for signing: " + request.uri(),
List.of()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergException;
import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.ContentKeyErrorDetails;
Expand Down Expand Up @@ -75,6 +76,8 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) {
body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind);
} else if (ex instanceof IllegalArgumentException) {
body = errorResponse(400, "IllegalArgumentException", ex.getMessage(), ex);
} else if (ex instanceof IcebergException) {
body = ((IcebergException) ex).toErrorResponse();
}

if (body == null) {
Expand Down

0 comments on commit 0fbb04f

Please sign in to comment.