Skip to content

Commit

Permalink
Implement early access-checks for Iceberg REST operations (#8768)
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy authored Jul 5, 2024
1 parent 38fef78 commit 0205bd0
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ as necessary. Empty sections will not end in the release notes.

- Catalog/GCS: Support using the default application credentials
- Catalog/S3: Allow custom key+trust stores
- Catalog: Check privileges earlier

### Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ public interface CatalogService {
* more.
* @param key content key of the table or view
* @param expectedType The expected content-type.
* @param forWrite indicates whether access checks shall be performed for a write/update request
* @return The response is either a response object or callback to produce the result. The latter
* is useful to return results that are quite big, for example Iceberg manifest lists or
* manifest files.
*/
CompletionStage<SnapshotResponse> retrieveSnapshot(
SnapshotReqParams reqParams, ContentKey key, @Nullable Content.Type expectedType)
SnapshotReqParams reqParams,
ContentKey key,
@Nullable Content.Type expectedType,
boolean forWrite)
throws NessieNotFoundException;

Stream<Supplier<CompletionStage<SnapshotResponse>>> retrieveSnapshots(
Expand Down
5 changes: 5 additions & 0 deletions catalog/service/impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ dependencies {
testFixturesApi(project(":nessie-catalog-secrets-api"))
testFixturesApi(project(":nessie-object-storage-mock"))
testFixturesApi(project(":nessie-combined-cs"))
testFixturesApi(project(":nessie-services"))
testFixturesApi(project(":nessie-services-config"))
testFixturesApi(project(":nessie-versioned-spi"))
testFixturesApi(project(":nessie-versioned-storage-store"))
testFixturesApi(project(":nessie-rest-services"))

testImplementation(platform(libs.awssdk.bom))
testImplementation("software.amazon.awssdk:s3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ public Stream<Supplier<CompletionStage<SnapshotResponse>>> retrieveSnapshots(

@Override
public CompletionStage<SnapshotResponse> retrieveSnapshot(
SnapshotReqParams reqParams, ContentKey key, @Nullable Content.Type expectedType)
SnapshotReqParams reqParams,
ContentKey key,
@Nullable Content.Type expectedType,
boolean forWrite)
throws NessieNotFoundException {

ParsedReference reference = reqParams.ref();
Expand All @@ -199,6 +202,7 @@ public CompletionStage<SnapshotResponse> retrieveSnapshot(
.getContent()
.refName(reference.name())
.hashOnRef(reference.hashWithRelativeSpec())
.forWrite(forWrite)
.getSingle(key);
Content content = contentResponse.getContent();
if (expectedType != null && !content.getType().equals(expectedType)) {
Expand Down Expand Up @@ -332,7 +336,8 @@ CompletionStage<MultiTableUpdate> commit(ParsedReference reference, CatalogCommi
nessieApi
.getContent()
.refName(reference.name())
.hashOnRef(reference.hashWithRelativeSpec());
.hashOnRef(reference.hashWithRelativeSpec())
.forWrite(true);
commit.getOperations().forEach(op -> contentRequest.key(op.getKey()));
GetMultipleContentsResponse contentsResponse = contentRequest.getWithResponse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.projectnessie.catalog.formats.iceberg.rest.IcebergUpdateRequirement.AssertCreate.assertTableDoesNotExist;
import static org.projectnessie.catalog.secrets.BasicCredentials.basicCredentials;
import static org.projectnessie.model.Content.Type.ICEBERG_TABLE;
import static org.projectnessie.services.authz.AbstractBatchAccessChecker.NOOP_ACCESS_CHECKER;

import java.time.Clock;
import java.util.Map;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
Expand All @@ -46,6 +48,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.projectnessie.api.v2.params.ParsedReference;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.catalog.files.s3.S3BucketOptions;
import org.projectnessie.catalog.files.s3.S3ClientSupplier;
import org.projectnessie.catalog.files.s3.S3Clients;
Expand Down Expand Up @@ -73,10 +76,26 @@
import org.projectnessie.nessie.tasks.service.impl.TasksServiceImpl;
import org.projectnessie.objectstoragemock.HeapStorageBucket;
import org.projectnessie.objectstoragemock.ObjectStorageMock;
import org.projectnessie.services.authz.AccessContext;
import org.projectnessie.services.authz.Authorizer;
import org.projectnessie.services.authz.BatchAccessChecker;
import org.projectnessie.services.config.ServerConfig;
import org.projectnessie.services.impl.ConfigApiImpl;
import org.projectnessie.services.impl.ContentApiImpl;
import org.projectnessie.services.impl.DiffApiImpl;
import org.projectnessie.services.impl.TreeApiImpl;
import org.projectnessie.services.rest.RestV2ConfigResource;
import org.projectnessie.services.rest.RestV2TreeResource;
import org.projectnessie.services.spi.ConfigService;
import org.projectnessie.services.spi.ContentService;
import org.projectnessie.services.spi.DiffService;
import org.projectnessie.services.spi.TreeService;
import org.projectnessie.versioned.VersionStore;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.testextension.NessiePersist;
import org.projectnessie.versioned.storage.testextension.NessiePersistCache;
import org.projectnessie.versioned.storage.testextension.PersistExtension;
import org.projectnessie.versioned.storage.versionstore.VersionStoreImpl;
import software.amazon.awssdk.http.SdkHttpClient;

@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class})
Expand All @@ -92,10 +111,12 @@ public abstract class AbstractCatalogService {
protected ScheduledExecutorService executor;
protected TasksServiceImpl tasksService;
protected HeapStorageBucket heapStorageBucket;
protected ObjectStorageMock.MockServer server;
protected ObjectStorageMock.MockServer objectStorageServer;
protected SdkHttpClient httpClient;
protected ObjectIO objectIO;
protected CatalogServiceImpl catalogService;
protected NessieApiV2 api;
protected volatile Function<AccessContext, BatchAccessChecker> batchAccessCheckerFactory;

protected ParsedReference commitSingle(Reference branch, ContentKey key)
throws InterruptedException, ExecutionException, BaseNessieClientServerException {
Expand Down Expand Up @@ -128,23 +149,47 @@ protected ParsedReference commitSingle(Reference branch, ContentKey key)

@BeforeEach
public void createCatalogServiceInstance() {
setupTasksService();

setupNessieApi();

setupObjectStorage();

setupObjectIO();

setupCatalogService();
}

private void setupTasksService() {
executor = Executors.newScheduledThreadPool(2);
JavaPoolTasksAsync tasksAsync = new JavaPoolTasksAsync(executor, Clock.systemUTC(), 1L);
tasksService =
new TasksServiceImpl(
tasksAsync,
mock(TaskServiceMetrics.class),
TasksServiceConfig.tasksServiceConfig("t", 1L, 20L));
}

heapStorageBucket = HeapStorageBucket.newHeapStorageBucket();
server =
ObjectStorageMock.builder()
.initAddress("localhost")
.putBuckets(BUCKET, heapStorageBucket.bucket())
.build()
.start();
S3Sessions sessions = new S3Sessions("foo", null);
private void setupCatalogService() {
catalogService = new CatalogServiceImpl();
catalogService.catalogConfig =
ImmutableCatalogConfigForTest.builder()
.defaultWarehouse(WAREHOUSE)
.putWarehouses(
WAREHOUSE,
ImmutableWarehouseConfigForTest.builder()
.location("s3://" + BUCKET + "/foo/bar/baz/")
.build())
.build();
catalogService.tasksService = tasksService;
catalogService.objectIO = objectIO;
catalogService.persist = persist;
catalogService.executor = executor;
catalogService.nessieApi = api;
}

private void setupObjectIO() {
S3Sessions sessions = new S3Sessions("foo", null);
S3Config s3config = S3Config.builder().build();
httpClient = S3Clients.apacheHttpClient(s3config, new SecretsProvider(names -> Map.of()));
S3Options<S3BucketOptions> s3options =
Expand All @@ -153,7 +198,7 @@ public void createCatalogServiceInstance() {
S3ProgrammaticOptions.S3PerBucketOptions.builder()
.accessKey(basicCredentials("foo", "bar"))
.region("eu-central-1")
.endpoint(server.getS3BaseUri())
.endpoint(objectStorageServer.getS3BaseUri())
.pathStyleAccess(true)
.build())
.build();
Expand All @@ -167,40 +212,72 @@ public void createCatalogServiceInstance() {
names.stream()
.collect(Collectors.toMap(k -> k, k -> Map.of("secret", "secret")))),
sessions);
S3ObjectIO objectIO = new S3ObjectIO(clientSupplier, Clock.systemUTC());
objectIO = new S3ObjectIO(clientSupplier, Clock.systemUTC());
}

api = new CombinedClientBuilder().withPersist(persist).build(NessieApiV2.class);
private void setupObjectStorage() {
heapStorageBucket = HeapStorageBucket.newHeapStorageBucket();
objectStorageServer =
ObjectStorageMock.builder()
.initAddress("localhost")
.putBuckets(BUCKET, heapStorageBucket.bucket())
.build()
.start();
}

catalogService = new CatalogServiceImpl();
catalogService.catalogConfig =
ImmutableCatalogConfigForTest.builder()
.defaultWarehouse(WAREHOUSE)
.putWarehouses(
WAREHOUSE,
ImmutableWarehouseConfigForTest.builder()
.location("s3://" + BUCKET + "/foo/bar/baz/")
.build())
.build();
catalogService.tasksService = tasksService;
catalogService.objectIO = objectIO;
catalogService.persist = persist;
catalogService.executor = executor;
catalogService.nessieApi = api;
private void setupNessieApi() {
batchAccessCheckerFactory = accessContext -> NOOP_ACCESS_CHECKER;

ServerConfig config =
new ServerConfig() {
@Override
public String getDefaultBranch() {
return "main";
}

@Override
public boolean sendStacktraceToClient() {
return true;
}
};
VersionStore versionStore = new VersionStoreImpl(persist);
Authorizer authorizer = context -> batchAccessCheckerFactory.apply(context);
AccessContext accessContext = () -> () -> null;
ConfigService configService =
new ConfigApiImpl(config, versionStore, authorizer, accessContext, 2);
TreeService treeService = new TreeApiImpl(config, versionStore, authorizer, accessContext);
ContentService contentService =
new ContentApiImpl(config, versionStore, authorizer, accessContext);
DiffService diffService = new DiffApiImpl(config, versionStore, authorizer, accessContext);

RestV2TreeResource treeResource =
new RestV2TreeResource(configService, treeService, contentService, diffService);
RestV2ConfigResource configResource =
new RestV2ConfigResource(config, versionStore, authorizer, accessContext);
api =
new CombinedClientBuilder()
.withTreeResource(treeResource)
.withConfigResource(configResource)
.build(NessieApiV2.class);
}

@AfterEach
public void shutdown() throws Exception {
try {
server.close();
api.close();
} finally {
try {
httpClient.close();
objectStorageServer.close();
} finally {
try {
tasksService.shutdown().toCompletableFuture().get(5, TimeUnit.MINUTES);
httpClient.close();
} finally {
executor.shutdown();
assertThat(executor.awaitTermination(5, TimeUnit.MINUTES)).isTrue();
try {
tasksService.shutdown().toCompletableFuture().get(5, TimeUnit.MINUTES);
} finally {
executor.shutdown();
assertThat(executor.awaitTermination(5, TimeUnit.MINUTES)).isTrue();
}
}
}
}
Expand Down
Loading

0 comments on commit 0205bd0

Please sign in to comment.