diff --git a/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageService.java b/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageService.java index a6ee316..6d58de9 100644 --- a/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageService.java +++ b/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageService.java @@ -14,7 +14,6 @@ package org.eclipse.edc.gcp.storage; -import com.google.cloud.storage.Storage; import org.eclipse.edc.gcp.common.GcpServiceAccount; import org.eclipse.edc.gcp.common.GcsBucket; @@ -24,15 +23,15 @@ public interface StorageService { /** - * Creates a new bucket with the given name and location. - * If the bucket exists in the correct location and is empty it is returned. - * Else an Exception is thrown. + * Checks if a bucket with the given name exists in the specific location, + * and returns it. Otherwise, if no bucket with the given name exists, + * creates it. * - * @param bucketName The name of the bucket - * @param location The location where the data in the bucket will be stored - * @return {@link Storage} + * @param bucketName The name of the bucket, must be unique in GCP + * @param location The location of the bucket (e.g. "EUROPE-WEST3", "EU") + * @return {@link GcsBucket} */ - GcsBucket getOrCreateEmptyBucket(String bucketName, String location); + GcsBucket getOrCreateBucket(String bucketName, String location); /** * Attaches a new role binding to the bucket that grants the service account the specified role on the bucket diff --git a/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageServiceImpl.java b/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageServiceImpl.java index 2e400cc..075ce4a 100644 --- a/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageServiceImpl.java +++ b/extensions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/storage/StorageServiceImpl.java @@ -37,7 +37,7 @@ public StorageServiceImpl(Storage storageClient, Monitor monitor) { } @Override - public GcsBucket getOrCreateEmptyBucket(String bucketName, String location) { + public GcsBucket getOrCreateBucket(String bucketName, String location) { var bucket = Optional.ofNullable(storageClient.get(bucketName)) .orElseGet(() -> { monitor.debug("Creating new bucket " + bucketName); diff --git a/extensions/common/gcp/gcp-core/src/test/java/org/eclipse/edc/gcp/storage/StorageServiceImplTest.java b/extensions/common/gcp/gcp-core/src/test/java/org/eclipse/edc/gcp/storage/StorageServiceImplTest.java index a2d490f..15f56f2 100644 --- a/extensions/common/gcp/gcp-core/src/test/java/org/eclipse/edc/gcp/storage/StorageServiceImplTest.java +++ b/extensions/common/gcp/gcp-core/src/test/java/org/eclipse/edc/gcp/storage/StorageServiceImplTest.java @@ -62,7 +62,7 @@ void getOrCreateBucketCreatesNewBucket() { when(storageMock.create(eq(expectedBucketInfo))).thenReturn(expectedBucket); - assertThat(storageService.getOrCreateEmptyBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName); + assertThat(storageService.getOrCreateBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName); } @Test @@ -76,7 +76,7 @@ void getOrCreateBucketReturnsExistingBucket() { when(storageMock.get(bucketName)).thenReturn(existingBucket); - assertThat(storageService.getOrCreateEmptyBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName); + assertThat(storageService.getOrCreateBucket(bucketName, bucketLocation).getName()).isEqualTo(bucketName); } @Test @@ -90,7 +90,7 @@ void getOrCreateBucketFailsIfBucketExistsInWrongRegion() { when(storageMock.get(bucketName)).thenReturn(existingBucket); - assertThatThrownBy(() -> storageService.getOrCreateEmptyBucket(bucketName, bucketLocation)).isInstanceOf(GcpException.class); + assertThatThrownBy(() -> storageService.getOrCreateBucket(bucketName, bucketLocation)).isInstanceOf(GcpException.class); } @Test diff --git a/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsConsumerResourceDefinitionGenerator.java b/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsConsumerResourceDefinitionGenerator.java index c897928..e8b271b 100644 --- a/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsConsumerResourceDefinitionGenerator.java +++ b/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsConsumerResourceDefinitionGenerator.java @@ -34,9 +34,10 @@ ResourceDefinition generate(DataRequest dataRequest, Policy policy) { var id = randomUUID().toString(); var location = destination.getStringProperty(GcsStoreSchema.LOCATION); var storageClass = destination.getStringProperty(GcsStoreSchema.STORAGE_CLASS); + var bucketName = destination.getStringProperty(GcsStoreSchema.BUCKET_NAME); return GcsResourceDefinition.Builder.newInstance().id(id).location(location) - .storageClass(storageClass).build(); + .storageClass(storageClass).bucketName(bucketName).build(); } @Override diff --git a/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsProvisioner.java b/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsProvisioner.java index 55953d7..f88a2f7 100644 --- a/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsProvisioner.java +++ b/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsProvisioner.java @@ -31,6 +31,7 @@ import org.eclipse.edc.spi.response.StatusResult; import org.jetbrains.annotations.NotNull; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -60,18 +61,22 @@ public boolean canDeprovision(ProvisionedResource resourceDefinition) { @Override public CompletableFuture> provision( GcsResourceDefinition resourceDefinition, Policy policy) { - var bucketName = resourceDefinition.getId(); - var bucketLocation = resourceDefinition.getLocation(); + var bucketName = Optional.ofNullable(resourceDefinition.getBucketName()) + .orElseGet(() -> { + var generatedBucketName = resourceDefinition.getId(); + monitor.debug("GCS bucket name generated: " + generatedBucketName); + return generatedBucketName; + }); monitor.debug("GCS Bucket request submitted: " + bucketName); - var resourceName = resourceDefinition.getId() + "-bucket"; + var bucketLocation = resourceDefinition.getLocation(); + var resourceName = bucketName + "-bucket"; var processId = resourceDefinition.getTransferProcessId(); try { - var bucket = storageService.getOrCreateEmptyBucket(bucketName, bucketLocation); - if (!storageService.isEmpty(bucketName)) { - return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, String.format("Bucket: %s already exists and is not empty.", bucketName))); - } + var bucket = storageService.getOrCreateBucket(bucketName, bucketLocation); + + // TODO use service account from transfer request, in case defined. var serviceAccount = createServiceAccount(processId, bucketName); var token = createBucketAccessToken(bucket, serviceAccount); @@ -79,8 +84,8 @@ public CompletableFuture> provision( var response = ProvisionResponse.Builder.newInstance().resource(resource).secretToken(token).build(); return CompletableFuture.completedFuture(StatusResult.success(response)); - } catch (GcpException e) { - return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, e.toString())); + } catch (GcpException gcpException) { + return completedFuture(StatusResult.failure(ResponseStatus.FATAL_ERROR, gcpException.toString())); } } @@ -126,13 +131,19 @@ private GcpAccessToken createBucketAccessToken(GcsBucket bucket, GcpServiceAccou } private GcsProvisionedResource getProvisionedResource(GcsResourceDefinition resourceDefinition, String resourceName, String bucketName, GcpServiceAccount serviceAccount) { + String serviceAccountEmail = null; + String serviceAccountName = null; + if (serviceAccount != null) { + serviceAccountEmail = serviceAccount.getEmail(); + serviceAccountName = serviceAccount.getEmail(); + } return GcsProvisionedResource.Builder.newInstance() .id(resourceDefinition.getId()) .resourceDefinitionId(resourceDefinition.getId()) .location(resourceDefinition.getLocation()) .storageClass(resourceDefinition.getStorageClass()) - .serviceAccountEmail(serviceAccount.getEmail()) - .serviceAccountName(serviceAccount.getName()) + .serviceAccountEmail(serviceAccountEmail) + .serviceAccountName(serviceAccountName) .transferProcessId(resourceDefinition.getTransferProcessId()) .resourceName(resourceName) .bucketName(bucketName) diff --git a/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsResourceDefinition.java b/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsResourceDefinition.java index e9bb7e2..73f7386 100644 --- a/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsResourceDefinition.java +++ b/extensions/control-plane/provision/provision-gcs/src/main/java/org/eclipse/edc/connector/provision/gcp/GcsResourceDefinition.java @@ -22,6 +22,7 @@ public class GcsResourceDefinition extends ResourceDefinition { private String location; private String storageClass; + private String bucketName; private GcsResourceDefinition() { } @@ -34,11 +35,16 @@ public String getStorageClass() { return this.storageClass; } + public String getBucketName() { + return this.bucketName; + } + @Override public Builder toBuilder() { return initializeBuilder(new Builder()) .location(location) - .storageClass(storageClass); + .storageClass(storageClass) + .bucketName(bucketName); } public static class Builder extends ResourceDefinition.Builder { @@ -61,9 +67,16 @@ public Builder storageClass(String storageClass) { return this; } + public Builder bucketName(String bucketName) { + resourceDefinition.bucketName = bucketName; + return this; + } + @Override protected void verify() { super.verify(); + // Bucket name is not required: if not present, provisioner generates a new one + // using the transfer id. Objects.requireNonNull(resourceDefinition.location, "location"); Objects.requireNonNull(resourceDefinition.storageClass, "storageClass"); } diff --git a/extensions/control-plane/provision/provision-gcs/src/test/java/org/eclipse/edc/connector/provision/gcp/GcsProvisionerTest.java b/extensions/control-plane/provision/provision-gcs/src/test/java/org/eclipse/edc/connector/provision/gcp/GcsProvisionerTest.java index 5742dea..fb100c5 100644 --- a/extensions/control-plane/provision/provision-gcs/src/test/java/org/eclipse/edc/connector/provision/gcp/GcsProvisionerTest.java +++ b/extensions/control-plane/provision/provision-gcs/src/test/java/org/eclipse/edc/connector/provision/gcp/GcsProvisionerTest.java @@ -79,7 +79,7 @@ void provisionSuccess() { var serviceAccount = new GcpServiceAccount("test-sa", "sa-name", "description"); var token = new GcpAccessToken("token", 123); - when(storageServiceMock.getOrCreateEmptyBucket(bucketName, bucketLocation)).thenReturn(bucket); + when(storageServiceMock.getOrCreateBucket(bucketName, bucketLocation)).thenReturn(bucket); when(storageServiceMock.isEmpty(bucketName)).thenReturn(true); when(iamServiceMock.getOrCreateServiceAccount(anyString(), anyString())).thenReturn(serviceAccount); doNothing().when(storageServiceMock).addProviderPermissions(bucket, serviceAccount); @@ -97,38 +97,36 @@ void provisionSuccess() { assertThat(secretToken.getToken()).isEqualTo("token"); }); - verify(storageServiceMock).getOrCreateEmptyBucket(bucketName, bucketLocation); + verify(storageServiceMock).getOrCreateBucket(bucketName, bucketLocation); verify(storageServiceMock).addProviderPermissions(bucket, serviceAccount); verify(iamServiceMock).createAccessToken(serviceAccount); } @Test - void provisionFailsIfBucketNotEmpty() { + void provisionSucceedsIfBucketNotEmpty() { var resourceDefinition = createResourceDefinition(); var bucketName = resourceDefinition.getId(); var bucketLocation = resourceDefinition.getLocation(); - when(storageServiceMock.getOrCreateEmptyBucket(bucketName, bucketLocation)).thenReturn(new GcsBucket(bucketName)); + when(storageServiceMock.getOrCreateBucket(bucketName, bucketLocation)).thenReturn(new GcsBucket(bucketName)); when(storageServiceMock.isEmpty(bucketName)).thenReturn(false); var response = provisioner.provision(resourceDefinition, testPolicy).join(); - assertThat(response.failed()).isTrue(); - assertThat(response.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(response.failed()).isFalse(); - verify(storageServiceMock).getOrCreateEmptyBucket(bucketName, bucketLocation); - verify(storageServiceMock, times(0)).addProviderPermissions(any(), any()); - verify(iamServiceMock, times(0)).createAccessToken(any()); + verify(storageServiceMock).getOrCreateBucket(bucketName, bucketLocation); + verify(storageServiceMock, times(1)).addProviderPermissions(any(), any()); + verify(iamServiceMock, times(1)).createAccessToken(any()); } - @Test void provisionFailsBecauseOfApiError() { var resourceDefinition = createResourceDefinition(); var bucketName = resourceDefinition.getId(); var bucketLocation = resourceDefinition.getLocation(); - doThrow(new GcpException("some error")).when(storageServiceMock).getOrCreateEmptyBucket(bucketName, bucketLocation); + doThrow(new GcpException("some error")).when(storageServiceMock).getOrCreateBucket(bucketName, bucketLocation); var response = provisioner.provision(resourceDefinition, testPolicy).join(); assertThat(response.failed()).isTrue(); diff --git a/extensions/data-plane/data-plane-google-storage/src/main/java/org/eclipse/edc/connector/dataplane/gcp/storage/GcsDataSink.java b/extensions/data-plane/data-plane-google-storage/src/main/java/org/eclipse/edc/connector/dataplane/gcp/storage/GcsDataSink.java index e9a3d4c..c7c2a3d 100644 --- a/extensions/data-plane/data-plane-google-storage/src/main/java/org/eclipse/edc/connector/dataplane/gcp/storage/GcsDataSink.java +++ b/extensions/data-plane/data-plane-google-storage/src/main/java/org/eclipse/edc/connector/dataplane/gcp/storage/GcsDataSink.java @@ -46,7 +46,7 @@ protected StreamResult transferParts(List parts) { var sinkBlobName = Optional.ofNullable(blobName) .orElseGet(part::name); var destinationBlobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, sinkBlobName)).build(); - try (var writer = storageClient.writer(destinationBlobInfo)) { + try (var writer = storageClient.writer(destinationBlobInfo, Storage.BlobWriteOption.doesNotExist())) { ByteStreams.copy(input, Channels.newOutputStream(writer)); } } catch (IOException e) {