Skip to content

Commit

Permalink
feat: GCS provisioner accepts optional bucket name from transfer requ…
Browse files Browse the repository at this point in the history
…est (#43)

- if the bucket name is not specific in the transfer request (bucket_name in dataAddress), the bucket name to store the destination blob is generated at runtime using the transfer id, otherwise the name passed is used
- GCS provisioner gets (or creates if not-existing) the specific bucket
- GCS sink writes the blob in the provisioned bucket, in case it doesn't overwrite an existing blob with the same name
  • Loading branch information
man8pr authored Sep 18, 2023
1 parent d552947 commit 351b43f
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.eclipse.edc.gcp.storage;

import com.google.cloud.storage.Storage;
import org.eclipse.edc.gcp.common.GcpServiceAccount;
import org.eclipse.edc.gcp.common.GcsBucket;

Expand All @@ -24,15 +23,15 @@
public interface StorageService {

/**
* Creates a new bucket with the given name and location.
* If the bucket exists in the correct location and is empty it is returned.
* Else an Exception is thrown.
* Checks if a bucket with the given name exists in the specific location,
* and returns it. Otherwise, if no bucket with the given name exists,
* creates it.
*
* @param bucketName The name of the bucket
* @param location The location where the data in the bucket will be stored
* @return {@link Storage}
* @param bucketName The name of the bucket, must be unique in GCP
* @param location The location of the bucket (e.g. "EUROPE-WEST3", "EU")
* @return {@link GcsBucket}
*/
GcsBucket getOrCreateEmptyBucket(String bucketName, String location);
GcsBucket getOrCreateBucket(String bucketName, String location);

/**
* Attaches a new role binding to the bucket that grants the service account the specified role on the bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public StorageServiceImpl(Storage storageClient, Monitor monitor) {
}

@Override
public GcsBucket getOrCreateEmptyBucket(String bucketName, String location) {
public GcsBucket getOrCreateBucket(String bucketName, String location) {
var bucket = Optional.ofNullable(storageClient.get(bucketName))
.orElseGet(() -> {
monitor.debug("Creating new bucket " + bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void getOrCreateBucketCreatesNewBucket() {

when(storageMock.create(eq(expectedBucketInfo))).thenReturn(expectedBucket);

assertThat(storageService.getOrCreateEmptyBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName);
assertThat(storageService.getOrCreateBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName);
}

@Test
Expand All @@ -76,7 +76,7 @@ void getOrCreateBucketReturnsExistingBucket() {

when(storageMock.get(bucketName)).thenReturn(existingBucket);

assertThat(storageService.getOrCreateEmptyBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName);
assertThat(storageService.getOrCreateBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName);
}

@Test
Expand All @@ -90,7 +90,7 @@ void getOrCreateBucketFailsIfBucketExistsInWrongRegion() {

when(storageMock.get(bucketName)).thenReturn(existingBucket);

assertThatThrownBy(() -> storageService.getOrCreateEmptyBucket(bucketName, bucketLocation)).isInstanceOf(GcpException.class);
assertThatThrownBy(() -> storageService.getOrCreateBucket(bucketName, bucketLocation)).isInstanceOf(GcpException.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ ResourceDefinition generate(DataRequest dataRequest, Policy policy) {
var id = randomUUID().toString();
var location = destination.getStringProperty(GcsStoreSchema.LOCATION);
var storageClass = destination.getStringProperty(GcsStoreSchema.STORAGE_CLASS);
var bucketName = destination.getStringProperty(GcsStoreSchema.BUCKET_NAME);

return GcsResourceDefinition.Builder.newInstance().id(id).location(location)
.storageClass(storageClass).build();
.storageClass(storageClass).bucketName(bucketName).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.edc.spi.response.StatusResult;
import org.jetbrains.annotations.NotNull;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -60,27 +61,31 @@ public boolean canDeprovision(ProvisionedResource resourceDefinition) {
@Override
public CompletableFuture<StatusResult<ProvisionResponse>> provision(
GcsResourceDefinition resourceDefinition, Policy policy) {
var bucketName = resourceDefinition.getId();
var bucketLocation = resourceDefinition.getLocation();
var bucketName = Optional.ofNullable(resourceDefinition.getBucketName())
.orElseGet(() -> {
var generatedBucketName = resourceDefinition.getId();
monitor.debug("GCS bucket name generated: " + generatedBucketName);
return generatedBucketName;
});

monitor.debug("GCS Bucket request submitted: " + bucketName);

var resourceName = resourceDefinition.getId() + "-bucket";
var bucketLocation = resourceDefinition.getLocation();
var resourceName = bucketName + "-bucket";
var processId = resourceDefinition.getTransferProcessId();
try {
var bucket = storageService.getOrCreateEmptyBucket(bucketName, bucketLocation);
if (!storageService.isEmpty(bucketName)) {
return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, String.format("Bucket: %s already exists and is not empty.", bucketName)));
}
var bucket = storageService.getOrCreateBucket(bucketName, bucketLocation);

// TODO use service account from transfer request, in case defined.
var serviceAccount = createServiceAccount(processId, bucketName);
var token = createBucketAccessToken(bucket, serviceAccount);

var resource = getProvisionedResource(resourceDefinition, resourceName, bucketName, serviceAccount);

var response = ProvisionResponse.Builder.newInstance().resource(resource).secretToken(token).build();
return CompletableFuture.completedFuture(StatusResult.success(response));
} catch (GcpException e) {
return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, e.toString()));
} catch (GcpException gcpException) {
return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, gcpException.toString()));
}
}

Expand Down Expand Up @@ -126,13 +131,19 @@ private GcpAccessToken createBucketAccessToken(GcsBucket bucket, GcpServiceAccou
}

private GcsProvisionedResource getProvisionedResource(GcsResourceDefinition resourceDefinition, String resourceName, String bucketName, GcpServiceAccount serviceAccount) {
String serviceAccountEmail = null;
String serviceAccountName = null;
if (serviceAccount != null) {
serviceAccountEmail = serviceAccount.getEmail();
serviceAccountName = serviceAccount.getEmail();
}
return GcsProvisionedResource.Builder.newInstance()
.id(resourceDefinition.getId())
.resourceDefinitionId(resourceDefinition.getId())
.location(resourceDefinition.getLocation())
.storageClass(resourceDefinition.getStorageClass())
.serviceAccountEmail(serviceAccount.getEmail())
.serviceAccountName(serviceAccount.getName())
.serviceAccountEmail(serviceAccountEmail)
.serviceAccountName(serviceAccountName)
.transferProcessId(resourceDefinition.getTransferProcessId())
.resourceName(resourceName)
.bucketName(bucketName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class GcsResourceDefinition extends ResourceDefinition {

private String location;
private String storageClass;
private String bucketName;

private GcsResourceDefinition() {
}
Expand All @@ -34,11 +35,16 @@ public String getStorageClass() {
return this.storageClass;
}

public String getBucketName() {
return this.bucketName;
}

@Override
public Builder toBuilder() {
return initializeBuilder(new Builder())
.location(location)
.storageClass(storageClass);
.storageClass(storageClass)
.bucketName(bucketName);
}

public static class Builder extends ResourceDefinition.Builder<GcsResourceDefinition, Builder> {
Expand All @@ -61,9 +67,16 @@ public Builder storageClass(String storageClass) {
return this;
}

public Builder bucketName(String bucketName) {
resourceDefinition.bucketName = bucketName;
return this;
}

@Override
protected void verify() {
super.verify();
// Bucket name is not required: if not present, provisioner generates a new one
// using the transfer id.
Objects.requireNonNull(resourceDefinition.location, "location");
Objects.requireNonNull(resourceDefinition.storageClass, "storageClass");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void provisionSuccess() {
var serviceAccount = new GcpServiceAccount("test-sa", "sa-name", "description");
var token = new GcpAccessToken("token", 123);

when(storageServiceMock.getOrCreateEmptyBucket(bucketName, bucketLocation)).thenReturn(bucket);
when(storageServiceMock.getOrCreateBucket(bucketName, bucketLocation)).thenReturn(bucket);
when(storageServiceMock.isEmpty(bucketName)).thenReturn(true);
when(iamServiceMock.getOrCreateServiceAccount(anyString(), anyString())).thenReturn(serviceAccount);
doNothing().when(storageServiceMock).addProviderPermissions(bucket, serviceAccount);
Expand All @@ -97,38 +97,36 @@ void provisionSuccess() {
assertThat(secretToken.getToken()).isEqualTo("token");
});

verify(storageServiceMock).getOrCreateEmptyBucket(bucketName, bucketLocation);
verify(storageServiceMock).getOrCreateBucket(bucketName, bucketLocation);
verify(storageServiceMock).addProviderPermissions(bucket, serviceAccount);
verify(iamServiceMock).createAccessToken(serviceAccount);
}

@Test
void provisionFailsIfBucketNotEmpty() {
void provisionSucceedsIfBucketNotEmpty() {
var resourceDefinition = createResourceDefinition();
var bucketName = resourceDefinition.getId();
var bucketLocation = resourceDefinition.getLocation();

when(storageServiceMock.getOrCreateEmptyBucket(bucketName, bucketLocation)).thenReturn(new GcsBucket(bucketName));
when(storageServiceMock.getOrCreateBucket(bucketName, bucketLocation)).thenReturn(new GcsBucket(bucketName));
when(storageServiceMock.isEmpty(bucketName)).thenReturn(false);

var response = provisioner.provision(resourceDefinition, testPolicy).join();

assertThat(response.failed()).isTrue();
assertThat(response.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR);
assertThat(response.failed()).isFalse();

verify(storageServiceMock).getOrCreateEmptyBucket(bucketName, bucketLocation);
verify(storageServiceMock, times(0)).addProviderPermissions(any(), any());
verify(iamServiceMock, times(0)).createAccessToken(any());
verify(storageServiceMock).getOrCreateBucket(bucketName, bucketLocation);
verify(storageServiceMock, times(1)).addProviderPermissions(any(), any());
verify(iamServiceMock, times(1)).createAccessToken(any());
}


@Test
void provisionFailsBecauseOfApiError() {
var resourceDefinition = createResourceDefinition();
var bucketName = resourceDefinition.getId();
var bucketLocation = resourceDefinition.getLocation();

doThrow(new GcpException("some error")).when(storageServiceMock).getOrCreateEmptyBucket(bucketName, bucketLocation);
doThrow(new GcpException("some error")).when(storageServiceMock).getOrCreateBucket(bucketName, bucketLocation);

var response = provisioner.provision(resourceDefinition, testPolicy).join();
assertThat(response.failed()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected StreamResult<Void> transferParts(List<DataSource.Part> parts) {
var sinkBlobName = Optional.ofNullable(blobName)
.orElseGet(part::name);
var destinationBlobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, sinkBlobName)).build();
try (var writer = storageClient.writer(destinationBlobInfo)) {
try (var writer = storageClient.writer(destinationBlobInfo, Storage.BlobWriteOption.doesNotExist())) {
ByteStreams.copy(input, Channels.newOutputStream(writer));
}
} catch (IOException e) {
Expand Down

0 comments on commit 351b43f

Please sign in to comment.