From be8fcd6216af1dc4a017bef33fdd267f11ed565a Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Tue, 5 Sep 2023 08:31:20 +0200 Subject: [PATCH] feat: add streaming http-to-http sample (#102) * feat: add streaming http-to-http sample * cleanup * PR remark --- .github/ISSUE_TEMPLATE/bug_report.md | 39 -- .github/ISSUE_TEMPLATE/config.yml | 2 - .github/ISSUE_TEMPLATE/feature_request.md | 21 - .github/PULL_REQUEST_TEMPLATE | 25 - .../basic-01-basic-connector/build.gradle.kts | 2 +- .../basic-02-health-endpoint/build.gradle.kts | 2 +- basic/basic-03-configuration/build.gradle.kts | 2 +- build.gradle.kts | 2 - gradle/libs.versions.toml | 3 + other/custom-runtime/build.gradle.kts | 2 +- settings.gradle.kts | 8 +- system-tests/build.gradle.kts | 5 +- .../FileTransferSampleTestCommon.java | 16 +- .../transfer/streaming/Participant.java | 453 ++++++++++++++++++ .../streaming/Streaming01httpToHttpTest.java | 117 +++++ .../streaming-01-http-to-http/.gitignore | 1 + .../streaming-01-http-to-http/README.md | 155 ++++++ .../streaming-01-http-to-http/asset.json | 10 + .../contract-definition.json | 7 + .../streaming-01-http-to-http/dataplane.json | 9 + .../get-dataset.json | 7 + .../negotiate-contract.json | 22 + .../policy-definition.json | 8 + .../streaming-01-runtime/build.gradle.kts | 46 ++ .../streaming-01-runtime/consumer.properties | 11 + .../streaming-01-runtime/provider.properties | 12 + .../http/HttpStreamingDataSource.java | 124 +++++ .../http/HttpStreamingDataSourceFactory.java | 58 +++ .../http/HttpStreamingExtension.java | 39 ++ ...rg.eclipse.edc.spi.system.ServiceExtension | 1 + .../streaming-01-http-to-http/transfer.json | 15 + .../file-transfer-consumer/build.gradle.kts | 2 +- .../file-transfer-provider/build.gradle.kts | 2 +- .../filetransfer.json | 3 +- .../build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- .../filetransfer.json | 1 - .../open-telemetry-consumer/build.gradle.kts | 2 +- .../open-telemetry-provider/build.gradle.kts | 2 +- .../transfer-05-file-transfer-cloud/README.md | 1 - .../cloud-transfer-consumer/build.gradle.kts | 2 +- .../cloud-transfer-provider/build.gradle.kts | 2 +- .../transfer-06-consumer-pull-http/README.md | 22 +- .../java/org/eclipse/edc/BackendService.java | 44 -- .../eclipse/edc/handler/ReceiverHandler.java | 32 -- .../http-pull-connector/build.gradle.kts | 2 +- .../transfer-07-provider-push-http/README.md | 13 +- .../http-push-connector/build.gradle.kts | 2 +- .../build.gradle.kts | 23 - .../java/org/eclipse/edc/BackendService.java | 44 -- .../eclipse/edc/handler/ReceiverHandler.java | 32 -- util/README.md | 3 + .../http-request-logger}/build.gradle.kts | 6 +- .../samples/util/HttpRequestLoggerServer.java | 56 +++ 54 files changed, 1214 insertions(+), 310 deletions(-) delete mode 100644 .github/ISSUE_TEMPLATE/bug_report.md delete mode 100644 .github/ISSUE_TEMPLATE/config.yml delete mode 100644 .github/ISSUE_TEMPLATE/feature_request.md delete mode 100644 .github/PULL_REQUEST_TEMPLATE create mode 100644 system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java create mode 100644 system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java create mode 100644 transfer/streaming/streaming-01-http-to-http/.gitignore create mode 100644 transfer/streaming/streaming-01-http-to-http/README.md create mode 100644 transfer/streaming/streaming-01-http-to-http/asset.json create mode 100644 transfer/streaming/streaming-01-http-to-http/contract-definition.json create mode 100644 transfer/streaming/streaming-01-http-to-http/dataplane.json create mode 100644 transfer/streaming/streaming-01-http-to-http/get-dataset.json create mode 100644 transfer/streaming/streaming-01-http-to-http/negotiate-contract.json create mode 100644 transfer/streaming/streaming-01-http-to-http/policy-definition.json create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build.gradle.kts create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSource.java create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingExtension.java create mode 100644 transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension create mode 100644 transfer/streaming/streaming-01-http-to-http/transfer.json delete mode 100644 transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/BackendService.java delete mode 100644 transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java delete mode 100644 transfer/transfer-07-provider-push-http/provider-push-http-backend-service/build.gradle.kts delete mode 100644 transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/BackendService.java delete mode 100644 transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java create mode 100644 util/README.md rename {transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service => util/http-request-logger}/build.gradle.kts (74%) create mode 100644 util/http-request-logger/src/main/java/org/eclipse/edc/samples/util/HttpRequestLoggerServer.java diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md deleted file mode 100644 index 43ac78ef..00000000 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ /dev/null @@ -1,39 +0,0 @@ ---- -name: Bug Report -about: Create a report to help us improve -title: '' -labels: 'bug' -assignees: '' - ---- - -# Bug Report - -## Describe the Bug -_A clear and concise description of the bug._ - -### Expected Behavior -_A clear and concise description of what you expected to happen._ - -### Observed Behavior -_A clear and concise description of what happened instead._ - -## Steps to Reproduce -Steps to reproduce the behavior: -1. Go to '...' -2. Click on '....' -3. Scroll down to '....' -4. See error - -## Context Information -_Add any other context about the problem here._ - -- Used version [e.g. EDC v1.0.0] -- OS: [e.g. iOS, Windows] -- ... - -## Detailed Description -_If applicable, add screenshots and logs to help explain your problem._ - -## Possible Implementation -_You already know the root cause of the erroneous state and how to fix it? Feel free to share your thoughts._ \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml deleted file mode 100644 index c96a9ab2..00000000 --- a/.github/ISSUE_TEMPLATE/config.yml +++ /dev/null @@ -1,2 +0,0 @@ ---- -blank_issues_enabled: true \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index 593b7759..00000000 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,21 +0,0 @@ ---- -name: Feature Request -about: Help us with new ideas -title: '' -labels: '' -assignees: '' - ---- - -# Feature Request - -_Briefly describe what feature you are missing or how the project's documentation could be improved._ - -## Why Is the Feature Desired? -_Are there any requirements?_ - -## Solution Proposal -_If possible, provide a (brief!) solution proposal._ - -## Type of Issue -_i.e., new feature, improvement, cleanup, etc._ diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE deleted file mode 100644 index d69530a6..00000000 --- a/.github/PULL_REQUEST_TEMPLATE +++ /dev/null @@ -1,25 +0,0 @@ -## What this PR changes/adds - -_Briefly describe WHAT your pr changes, which features it adds/modifies._ - -## Why it does that - -_Briefly state why the change was necessary._ - -## Further notes - -_List other areas of code that have changed but are not necessarily linked to the main feature. This could be method -signature changes, package declarations, bugs that were encountered and were fixed inline, etc._ - -## Linked Issue(s) - -Closes # <-- _insert Issue number if one exists_ - -## Checklist - -- [ ] performed checkstyle check locally? -- [ ] added/updated copyright headers? -- [ ] documented public classes/methods? -- [ ] documented code? -- [ ] assigned appropriate label? -- [ ] formatted title correctly? (_take a look at the [CONTRIBUTING](https://github.com/eclipse-edc/Samples/blob/main/CONTRIBUTING.md#submit-a-pull-request) for details_) \ No newline at end of file diff --git a/basic/basic-01-basic-connector/build.gradle.kts b/basic/basic-01-basic-connector/build.gradle.kts index 6cd2d3e2..6a58db1d 100644 --- a/basic/basic-01-basic-connector/build.gradle.kts +++ b/basic/basic-01-basic-connector/build.gradle.kts @@ -16,7 +16,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/basic/basic-02-health-endpoint/build.gradle.kts b/basic/basic-02-health-endpoint/build.gradle.kts index a88eff81..b00dfe2e 100644 --- a/basic/basic-02-health-endpoint/build.gradle.kts +++ b/basic/basic-02-health-endpoint/build.gradle.kts @@ -16,7 +16,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/basic/basic-03-configuration/build.gradle.kts b/basic/basic-03-configuration/build.gradle.kts index ea2e833b..a281694f 100644 --- a/basic/basic-03-configuration/build.gradle.kts +++ b/basic/basic-03-configuration/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/build.gradle.kts b/build.gradle.kts index 949de273..765747a0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -42,7 +42,6 @@ allprojects { configure { versions { // override default dependency versions here - projectVersion.set(edcVersion) metaModel.set(edcVersion) } publish.set(false) @@ -53,7 +52,6 @@ allprojects { configDirectory.set(rootProject.file("resources")) } - // EdcRuntimeExtension uses this to determine the runtime classpath of the module to run. tasks.register("printClasspath") { doLast { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7ee6ab53..87dbe8cd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -8,6 +8,7 @@ edc = "0.2.1" jakarta-json = "2.0.1" junit-pioneer = "2.0.1" jupiter = "5.10.0" +okhttp-mockwebserver = "5.0.0-alpha.11" openTelemetry = "1.18.0" restAssured = "5.3.1" rsApi = "3.1.0" @@ -40,6 +41,7 @@ edc-http = { module = "org.eclipse.edc:http", version.ref = "edc" } edc-iam-mock = { module = "org.eclipse.edc:iam-mock", version.ref = "edc" } edc-jersey-micrometer = { module = "org.eclipse.edc:jersey-micrometer", version.ref = "edc" } edc-jetty-micrometer = { module = "org.eclipse.edc:jetty-micrometer", version.ref = "edc" } +edc-json-ld = { module = "org.eclipse.edc:json-ld", version.ref = "edc" } edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" } edc-management-api = { module = "org.eclipse.edc:management-api", version.ref = "edc" } edc-micrometer-core = { module = "org.eclipse.edc:micrometer-core", version.ref = "edc" } @@ -59,6 +61,7 @@ junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.re junit-jupiter-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "jupiter" } junit-jupiter-params = { module = "org.junit.jupiter:junit-jupiter-params", version.ref = "jupiter" } junit-pioneer = { module = "org.junit-pioneer:junit-pioneer", version.ref = "junit-pioneer" } +okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp-mockwebserver" } opentelemetry-annotations = { module = "io.opentelemetry:opentelemetry-extension-annotations", version.ref = "openTelemetry" } restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssured" } diff --git a/other/custom-runtime/build.gradle.kts b/other/custom-runtime/build.gradle.kts index 4e841633..9868d2c5 100644 --- a/other/custom-runtime/build.gradle.kts +++ b/other/custom-runtime/build.gradle.kts @@ -15,7 +15,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/settings.gradle.kts b/settings.gradle.kts index b4b60530..5d3eb165 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -54,10 +54,12 @@ include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-provider") include(":transfer:transfer-05-file-transfer-cloud:transfer-file-cloud") include("transfer:transfer-06-consumer-pull-http:http-pull-connector") -include("transfer:transfer-06-consumer-pull-http:consumer-pull-backend-service") - include("transfer:transfer-07-provider-push-http:http-push-connector") -include("transfer:transfer-07-provider-push-http:provider-push-http-backend-service") + +include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime") + +include("util:http-request-logger") + // modules for code samples ------------------------------------------------------------------------ include(":other:custom-runtime") diff --git a/system-tests/build.gradle.kts b/system-tests/build.gradle.kts index 62aec820..b59b966e 100644 --- a/system-tests/build.gradle.kts +++ b/system-tests/build.gradle.kts @@ -17,10 +17,12 @@ plugins { } dependencies { - implementation(libs.edc.junit) implementation(libs.edc.transfer.spi) + testImplementation(libs.edc.junit) + testImplementation(libs.edc.json.ld) testImplementation(libs.awaitility) + testImplementation(libs.okhttp.mockwebserver) testImplementation(libs.restAssured) // runtimes @@ -31,4 +33,5 @@ dependencies { testCompileOnly(project(":transfer:transfer-01-file-transfer:file-transfer-provider")) testCompileOnly(project(":transfer:transfer-02-file-transfer-listener:file-transfer-listener-consumer")) testCompileOnly(project(":transfer:transfer-03-modify-transferprocess:modify-transferprocess-consumer")) + testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")) } diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/FileTransferSampleTestCommon.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/FileTransferSampleTestCommon.java index 41ca7f18..be7d0474 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/FileTransferSampleTestCommon.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/FileTransferSampleTestCommon.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; import java.util.Map; @@ -36,7 +37,7 @@ import static org.hamcrest.Matchers.not; /** - * Encapsulates common settings, test steps, and helper methods for the test for samples 4.x. + * Encapsulates common settings, test steps, and helper methods for transfer samples */ public class FileTransferSampleTestCommon { @@ -78,6 +79,19 @@ public static File getFileFromRelativePath(String relativePath) { return new File(TestUtils.findBuildRoot(), relativePath); } + /** + * Resolves a {@link File} instance from a relative path. + */ + @NotNull + public static String getFileContentFromRelativePath(String relativePath) { + var fileFromRelativePath = getFileFromRelativePath(relativePath); + try { + return Files.readString(Paths.get(fileFromRelativePath.toURI())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Assert that prerequisites are fulfilled before running the test. * This assertion checks only whether the file to be copied is not existing already. diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java new file mode 100644 index 00000000..e93e721c --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java @@ -0,0 +1,453 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.samples.transfer.streaming; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.restassured.specification.RequestSpecification; +import jakarta.json.Json; +import jakarta.json.JsonArray; +import jakarta.json.JsonObject; +import org.eclipse.edc.connector.contract.spi.ContractId; +import org.eclipse.edc.jsonld.TitaniumJsonLd; +import org.eclipse.edc.jsonld.spi.JsonLd; +import org.eclipse.edc.jsonld.util.JacksonJsonLd; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.monitor.ConsoleMonitor; + +import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static io.restassured.RestAssured.given; +import static io.restassured.http.ContentType.JSON; +import static jakarta.json.Json.createArrayBuilder; +import static jakarta.json.Json.createObjectBuilder; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.FINALIZED; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATASET_ATTRIBUTE; +import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.ODRL_POLICY_ATTRIBUTE; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.CoreConstants.EDC_PREFIX; + +/** + * Essentially a wrapper around the management API enabling to test interactions with other participants, eg. catalog, transfer... + */ +public class Participant { + + private static final String DSP_PROTOCOL = "dataspace-protocol-http"; + private static final Duration TIMEOUT = Duration.ofSeconds(30); + + protected String id; + protected String name; + protected Endpoint managementEndpoint; + protected Endpoint protocolEndpoint; + protected Endpoint controlEndpoint; + protected JsonLd jsonLd; + protected ObjectMapper objectMapper; + + protected Participant() { + } + + public String getName() { + return name; + } + + public void registerDataPlane(List sourceTypes, List destinationTypes) { + var jsonObject = Json.createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(ID, UUID.randomUUID().toString()) + .add(EDC_NAMESPACE + "url", controlEndpoint.url + "/transfer") + .add(EDC_NAMESPACE + "allowedSourceTypes", createArrayBuilder(sourceTypes)) + .add(EDC_NAMESPACE + "allowedDestTypes", createArrayBuilder(destinationTypes)) + .build(); + + managementEndpoint.baseRequest() + .contentType(JSON) + .body(jsonObject.toString()) + .when() + .post("/v2/dataplanes") + .then() + .statusCode(204); + } + + public String createAsset(String requestBody) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v3/assets") + .then() + .statusCode(200) + .contentType(JSON) + .extract().jsonPath().getString(ID); + } + + public String createPolicyDefinition(String requestBody) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v2/policydefinitions") + .then() + .statusCode(200) + .contentType(JSON) + .extract().jsonPath().getString(ID); + } + + public String createContractDefinition(String requestBody) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v2/contractdefinitions") + .then() + .statusCode(200) + .extract().jsonPath().getString(ID); + } + + /** + * Request provider catalog. + * + * @param provider data provider + * @return list of {@link org.eclipse.edc.catalog.spi.Dataset}. + */ + public JsonArray getCatalogDatasets(Participant provider) { + var datasetReference = new AtomicReference(); + var requestBody = createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(TYPE, "CatalogRequest") + .add("counterPartyAddress", provider.protocolEndpoint.url.toString()) + .add("protocol", DSP_PROTOCOL) + .build(); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var response = managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .body(requestBody) + .post("/v2/catalog/request") + .then() + .log().all() + .statusCode(200) + .extract().body().asString(); + + var responseBody = objectMapper.readValue(response, JsonObject.class); + + var catalog = jsonLd.expand(responseBody).orElseThrow(f -> new EdcException(f.getFailureDetail())); + + var datasets = catalog.getJsonArray(DCAT_DATASET_ATTRIBUTE); + assertThat(datasets).hasSizeGreaterThan(0); + + datasetReference.set(datasets); + }); + + return datasetReference.get(); + } + + /** + * Get first {@link org.eclipse.edc.catalog.spi.Dataset} from provider matching the given asset id. + * + * @param provider data provider + * @param assetId asset id + * @return dataset. + */ + public JsonObject getDatasetForAsset(Participant provider, String assetId) { + var datasetReference = new AtomicReference(); + var requestBody = createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(TYPE, "DatasetRequest") + .add(ID, assetId) + .add("counterPartyAddress", provider.protocolEndpoint.url.toString()) + .add("protocol", DSP_PROTOCOL) + .build(); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var response = managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .body(requestBody) + .post("/v2/catalog/dataset/request") + .then() + .log().all() + .statusCode(200) + .extract().body().asString(); + + var compacted = objectMapper.readValue(response, JsonObject.class); + + var dataset = jsonLd.expand(compacted).orElseThrow(f -> new EdcException(f.getFailureDetail())); + + datasetReference.set(dataset); + }); + + return datasetReference.get(); + } + + /** + * Initiate negotiation with a provider. + * + * @param provider data provider + * @param offerId contract definition id + * @param assetId asset id + * @param policy policy + * @return id of the contract agreement. + */ + public String negotiateContract(Participant provider, String offerId, String assetId, JsonObject policy) { + var requestBody = createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(TYPE, "ContractRequestDto") + .add("providerId", provider.id) + .add("connectorAddress", provider.protocolEndpoint.url.toString()) + .add("protocol", DSP_PROTOCOL) + .add("offer", createObjectBuilder() + .add("offerId", offerId) + .add("assetId", assetId) + .add("policy", jsonLd.compact(policy).getContent()) + ) + .build(); + + var negotiationId = managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v2/contractnegotiations") + .then() + .statusCode(200) + .extract().body().jsonPath().getString(ID); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var state = getContractNegotiationState(negotiationId); + assertThat(state).isEqualTo(FINALIZED.name()); + }); + + return getContractAgreementId(negotiationId); + } + + /** + * Initiate data transfer. + * + * @param provider data provider + * @param contractAgreementId contract agreement id + * @param assetId asset id + * @param privateProperties private properties + * @param destination data destination address + * @return id of the transfer process. + */ + public String initiateTransfer(Participant provider, String contractAgreementId, String assetId, JsonObject privateProperties, JsonObject destination) { + var requestBody = createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(TYPE, "TransferRequest") + .add("dataDestination", destination) + .add("protocol", DSP_PROTOCOL) + .add("assetId", assetId) + .add("contractId", contractAgreementId) + .add("connectorId", provider.id) + .add("connectorAddress", provider.protocolEndpoint.url.toString()) + .add("privateProperties", privateProperties) + .build(); + + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v2/transferprocesses") + .then() + .log().ifError() + .statusCode(200) + .extract().body().jsonPath().getString(ID); + } + + /** + * Request a provider asset: + * - retrieves the contract definition associated with the asset, + * - handles the contract negotiation, + * - initiate the data transfer. + * + * @param provider data provider + * @param assetId asset id + * @param privateProperties private properties of the data request + * @param destination data destination + * @return transfer process id. + */ + public String requestAsset(Participant provider, String assetId, JsonObject privateProperties, JsonObject destination) { + var dataset = getDatasetForAsset(provider, assetId); + var policy = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject(); + var contractDefinitionId = ContractId.parseId(policy.getString(ID)) + .orElseThrow(failure -> new RuntimeException(failure.getFailureDetail())); + var contractAgreementId = negotiateContract(provider, contractDefinitionId.toString(), assetId, policy); + var transferProcessId = initiateTransfer(provider, contractAgreementId, assetId, privateProperties, destination); + assertThat(transferProcessId).isNotNull(); + return transferProcessId; + } + + /** + * Get current state of a transfer process. + * + * @param id transfer process id + * @return state of the transfer process. + */ + public String getTransferProcessState(String id) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .get("/v2/transferprocesses/{id}/state", id) + .then() + .statusCode(200) + .extract().body().jsonPath().getString("'edc:state'"); + } + + private ContractId extractContractDefinitionId(JsonObject dataset) { + var contractId = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject().getString(ID); + return ContractId.parseId(contractId).orElseThrow(f -> new RuntimeException(f.getFailureDetail())); + } + + private String getContractNegotiationState(String id) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .get("/v2/contractnegotiations/{id}/state", id) + .then() + .statusCode(200) + .extract().body().jsonPath().getString("'edc:state'"); + } + + + private String getContractAgreementId(String negotiationId) { + var contractAgreementIdAtomic = new AtomicReference(); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var agreementId = getContractNegotiationField(negotiationId, "contractAgreementId"); + assertThat(agreementId).isNotNull().isInstanceOf(String.class); + + contractAgreementIdAtomic.set(agreementId); + }); + + var contractAgreementId = contractAgreementIdAtomic.get(); + assertThat(id).isNotEmpty(); + return contractAgreementId; + } + + private String getContractNegotiationField(String negotiationId, String fieldName) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .get("/v2/contractnegotiations/{id}", negotiationId) + .then() + .statusCode(200) + .extract().body().jsonPath() + .getString(format("'edc:%s'", fieldName)); + } + + /** + * Represent an endpoint exposed by a {@link Participant}. + */ + public static class Endpoint { + private final URI url; + private final Map headers; + + public Endpoint(URI url) { + this.url = url; + this.headers = new HashMap<>(); + } + + public Endpoint(URI url, Map headers) { + this.url = url; + this.headers = headers; + } + + public RequestSpecification baseRequest() { + return given().baseUri(url.toString()).headers(headers); + } + + public URI getUrl() { + return url; + } + } + + public static class Builder

> { + protected final P participant; + + protected Builder(P participant) { + this.participant = participant; + } + + public static > Builder newInstance() { + return new Builder<>(new Participant()); + } + + public B id(String id) { + participant.id = id; + return self(); + } + + public B name(String name) { + participant.name = name; + return self(); + } + + public B managementEndpoint(Endpoint managementEndpoint) { + participant.managementEndpoint = managementEndpoint; + return self(); + } + + public B protocolEndpoint(Endpoint protocolEndpoint) { + participant.protocolEndpoint = protocolEndpoint; + return self(); + } + + public B controlEndpoint(Endpoint controlEndpoint) { + participant.controlEndpoint = controlEndpoint; + return self(); + } + + public B jsonLd(JsonLd jsonLd) { + participant.jsonLd = jsonLd; + return self(); + } + + public B objectMapper(ObjectMapper objectMapper) { + participant.objectMapper = objectMapper; + return self(); + } + + public Participant build() { + Objects.requireNonNull(participant.id, "id"); + Objects.requireNonNull(participant.name, "name"); + Objects.requireNonNull(participant.managementEndpoint, "managementEndpoint"); + Objects.requireNonNull(participant.protocolEndpoint, "protocolEndpoint"); + if (participant.jsonLd == null) { + participant.jsonLd = new TitaniumJsonLd(new ConsoleMonitor()); + } + if (participant.objectMapper == null) { + participant.objectMapper = JacksonJsonLd.createObjectMapper(); + } + return participant; + } + + @SuppressWarnings("unchecked") + private B self() { + return (B) this; + } + } +} diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java new file mode 100644 index 00000000..070fefcb --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming; + +import jakarta.json.Json; +import okhttp3.mockwebserver.MockWebServer; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath; +import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath; + +@EndToEndTest +public class Streaming01httpToHttpTest { + + private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-01-http-to-http"; + private static final Duration TIMEOUT = Duration.ofSeconds(30); + + private static final Participant PROVIDER = Participant.Builder.newInstance() + .name("provider") + .id("provider") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .build(); + + private static final Participant CONSUMER = Participant.Builder.newInstance() + .name("consumer") + .id("consumer") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .build(); + + @RegisterExtension + static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension( + ":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime", + "provider", + Map.of( + "edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-01-runtime/provider.properties").getAbsolutePath() + ) + ); + + @RegisterExtension + static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension( + ":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime", + "provider", + Map.of( + "edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-01-runtime/consumer.properties").getAbsolutePath() + ) + ); + private final int httpReceiverPort = TestUtils.getFreePort(); + private final MockWebServer consumerReceiverServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + consumerReceiverServer.start(httpReceiverPort); + } + + @Test + void streamData() throws IOException { + var source = Files.createTempDirectory("source"); + PROVIDER.registerDataPlane(List.of("HttpStreaming"), List.of("HttpData")); + + PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/asset.json") + .replace("{{sourceFolder}}", source.toString())); + PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/policy-definition.json")); + PROVIDER.createContractDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/contract-definition.json")); + + var destination = Json.createObjectBuilder() + .add("type", "HttpData") + .add("baseUrl", "http://localhost:" + httpReceiverPort) + .build(); + var transferProcessId = CONSUMER.requestAsset(PROVIDER, "stream-asset", Json.createObjectBuilder().build(), destination); + + await().atMost(TIMEOUT).untilAsserted(() -> { + String state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + var eventBody = "message that will be sent".getBytes(); + Files.write(source.resolve("message-" + UUID.randomUUID()), eventBody); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var request = consumerReceiverServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getBody().readByteArray()).isEqualTo(eventBody); + }); + } +} diff --git a/transfer/streaming/streaming-01-http-to-http/.gitignore b/transfer/streaming/streaming-01-http-to-http/.gitignore new file mode 100644 index 00000000..3db92cfa --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/.gitignore @@ -0,0 +1 @@ +source/* diff --git a/transfer/streaming/streaming-01-http-to-http/README.md b/transfer/streaming/streaming-01-http-to-http/README.md new file mode 100644 index 00000000..9beee036 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/README.md @@ -0,0 +1,155 @@ +# Streaming HTTP to HTTP + +This sample will show how you can set up the EDC to stream messages from HTTP to HTTP. +This code is only for demonstration purposes and should not be used in production. + +## Concept +We will build a data-plane `DataSource` extension that will retrieve new data from a disk folder and push it +to every consumer that has started a `TransferProcess` for a related asset. + +### Run + +Build the connector runtime, which will be used both for the provider and consumer: +```shell +./gradlew :transfer:streaming:streaming-01-http-to-http:streaming-01-runtime:build +``` + +Run the provider and the consumer, which must be started from different terminal shells: +```shell +# provider +export EDC_FS_CONFIG=transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties +java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build/libs/connector.jar + +#consumer +export EDC_FS_CONFIG=transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties +java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build/libs/connector.jar +``` + +#### Register Data Plane on provider +The provider connector needs to be aware of the streaming capabilities of the embedded dataplane, which can be registered with +this call: +```js +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +``` + +If you look at the `dataplane.json` you'll notice that the supported source is `HttpStreaming` and the supported sink is `HttpData`. + +#### Register Asset, Policy Definition and Contract Definition on provider +A "source" folder must first be created where the data plane will get the messages to be sent to the consumers. +To do this, create a temp folder: +```shell +mkdir /tmp/source +``` + +Then put the path in the [asset.json](asset.json) file replacing the `{{sourceFolder}}` placeholder. +```json + "dataAddress": { + "type": "HttpStreaming", + "sourceFolder": "{{sourceFolder}}" + } +``` + +Then create the Asset, the Policy Definition and the Contract Definition with these three calls: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/asset.json -X POST "http://localhost:18181/management/v3/assets" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +``` + +#### Negotiate the contract +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. +However, in this sample case, we already have the provider asset (`"stream-asset"`) so we can get the related dataset +directly with this call: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +``` + +The output will be something like: +```json +{ + "@id": "stream-asset", + "@type": "dcat:Dataset", + "odrl:hasPolicy": { + "@id": "Y29udHJhY3QtZGVmaW5pdGlvbg==:c3RyZWFtLWFzc2V0:NDlhYTUzZWEtMDUzMS00ZDkyLTg4Y2YtMGRjMTc4MmQ1NjY4", + "@type": "odrl:Set", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "stream-asset" + }, + "dcat:distribution": { + "@type": "dcat:Distribution", + "dct:format": { + "@id": "HttpData" + }, + "dcat:accessService": "b24dfdbc-d17f-4d6e-9b5c-8fa71dacecfc" + }, + "edc:id": "stream-asset", + "@context": { + "dct": "https://purl.org/dc/terms/", + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "dcat": "https://www.w3.org/ns/dcat/", + "odrl": "http://www.w3.org/ns/odrl/2/", + "dspace": "https://w3id.org/dspace/v0.8/" + } +} +``` + +With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](negotiate-contract.json) file +and request the contract negotiation: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +``` + +### Start the transfer +First we need to set up the receiver server on the consumer side that will receive a call for every message. For this +you'll need to open another terminal shell and run: +```shell +./gradlew util:http-request-logger:build +HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +``` +It will run on port 4000. + +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with +this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +```shell +curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq +``` + +If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [transfer.json](transfer.json) +file to `{{contract-agreement-id}}` and then calling the connector with this command: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +``` +> Note that the destination address is `localhost:4000`, this because is where our http server is listening. + + +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned +by the start transfer call: +```shell +curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq +``` + +Here we can test the transfer creating a file into the `source` folder that we configured before, e.g. copying the `README.md` +into the `source` folder: +```shell +cp README.md /tmp/source +``` + +we should see the content logged into the received server: +``` +Incoming request +Method: POST +Path: / +Body: +# EDC Samples +... +``` +### Up to you: second connector + +As a challenge, try starting another consumer connector, negotiating a contract, and starting the transfer. +Every message pushed by the provider will be sent to all the consumers. + +## Technical insight + +The required code is contained in the [`streaming-01-runtime` source folder](transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http). diff --git a/transfer/streaming/streaming-01-http-to-http/asset.json b/transfer/streaming/streaming-01-http-to-http/asset.json new file mode 100644 index 00000000..e9e55303 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/asset.json @@ -0,0 +1,10 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "stream-asset", + "properties": { + }, + "dataAddress": { + "type": "HttpStreaming", + "sourceFolder": "{{sourceFolder}}" + } +} diff --git a/transfer/streaming/streaming-01-http-to-http/contract-definition.json b/transfer/streaming/streaming-01-http-to-http/contract-definition.json new file mode 100644 index 00000000..d424ec90 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/contract-definition.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "contract-definition", + "accessPolicyId": "no-constraint-policy", + "contractPolicyId": "no-constraint-policy", + "assetsSelector": [] +} diff --git a/transfer/streaming/streaming-01-http-to-http/dataplane.json b/transfer/streaming/streaming-01-http-to-http/dataplane.json new file mode 100644 index 00000000..09cc85a1 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/dataplane.json @@ -0,0 +1,9 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@id": "http-pull-provider-dataplane", + "url": "http://localhost:19192/control/transfer", + "allowedSourceTypes": [ "HttpStreaming" ], + "allowedDestTypes": [ "HttpData" ] +} diff --git a/transfer/streaming/streaming-01-http-to-http/get-dataset.json b/transfer/streaming/streaming-01-http-to-http/get-dataset.json new file mode 100644 index 00000000..94a12236 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/get-dataset.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@type": "DatasetRequest", + "@id": "stream-asset", + "counterPartyAddress": "http://localhost:18182/protocol", + "protocol": "dataspace-protocol-http" +} diff --git a/transfer/streaming/streaming-01-http-to-http/negotiate-contract.json b/transfer/streaming/streaming-01-http-to-http/negotiate-contract.json new file mode 100644 index 00000000..31486f96 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/negotiate-contract.json @@ -0,0 +1,22 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@type": "NegotiationInitiateRequestDto", + "connectorAddress": "http://localhost:18182/protocol", + "providerId": "provider", + "protocol": "dataspace-protocol-http", + "offer": { + "offerId": "{{offerId}}", + "assetId": "stream-asset", + "policy": { + "@id": "{{offerId}}", + "@type": "use", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "stream-asset" + } + } +} diff --git a/transfer/streaming/streaming-01-http-to-http/policy-definition.json b/transfer/streaming/streaming-01-http-to-http/policy-definition.json new file mode 100644 index 00000000..e1bdd6af --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/policy-definition.json @@ -0,0 +1,8 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "no-constraint-policy", + "policy": { + "@context": "http://www.w3.org/ns/odrl.jsonld", + "@type": "use" + } +} diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build.gradle.kts b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build.gradle.kts new file mode 100644 index 00000000..99333cca --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build.gradle.kts @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020, 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * Fraunhofer Institute for Software and Systems Engineering - added dependencies + * ZF Friedrichshafen AG - add dependency + * + */ + +plugins { + `java-library` + id("application") + alias(libs.plugins.shadow) +} + +dependencies { + implementation(libs.edc.control.plane.core) + implementation(libs.edc.data.plane.selector.core) + implementation(libs.edc.api.observability) + implementation(libs.edc.configuration.filesystem) + implementation(libs.edc.iam.mock) + implementation(libs.edc.management.api) + implementation(libs.edc.dsp) + implementation(libs.edc.data.plane.selector.api) + implementation(libs.edc.data.plane.selector.client) + implementation(libs.edc.transfer.data.plane) + implementation(libs.edc.data.plane.spi) + implementation(libs.edc.data.plane.core) + implementation(libs.edc.data.plane.http) +} + +application { + mainClass.set("org.eclipse.edc.boot.system.runtime.BaseRuntime") +} + +tasks.withType { + mergeServiceFiles() + archiveFileName.set("connector.jar") +} diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties new file mode 100644 index 00000000..a778cd4d --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties @@ -0,0 +1,11 @@ +web.http.port=28180 +web.http.path=/api +web.http.management.port=28181 +web.http.management.path=/management +web.http.protocol.port=28182 +web.http.protocol.path=/protocol +web.http.control.port=28183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:28182/protocol +edc.participant.id=consumer +edc.ids.id=urn:connector:consumer diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties new file mode 100644 index 00000000..a357378a --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties @@ -0,0 +1,12 @@ +web.http.port=18180 +web.http.path=/api +web.http.management.port=18181 +web.http.management.path=/management +web.http.protocol.port=18182 +web.http.protocol.path=/protocol +web.http.control.port=18183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:18182/protocol +edc.participant.id=provider +edc.ids.id=urn:connector:provider +edc.dataplane.http.sink.partition.size=1 diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSource.java b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSource.java new file mode 100644 index 00000000..018b0f64 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSource.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming.http; + +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.Objects; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.util.stream.StreamSupport.stream; + +/** + * Takes care to activate a {@link WatchService} and instantiate a {@link Spliterator} on it, generating a `Stream` of + * `Part`s that the data plane will then pipe to the configured destination (in our sample it is a `HttpData` address) + */ +public class HttpStreamingDataSource implements DataSource, Closeable { + + private final WatchService watchService; + private final File sourceFolder; + + public HttpStreamingDataSource(File sourceFolder) { + try { + this.watchService = FileSystems.getDefault().newWatchService(); + this.sourceFolder = sourceFolder; + Paths.get(this.sourceFolder.toURI()).register(watchService, ENTRY_CREATE, ENTRY_MODIFY); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public StreamResult> openPartStream() { + Stream stream = openRecordsStream(watchService).filter(Objects::nonNull) + .flatMap(it -> it.pollEvents().stream()) + .map(it -> it.context().toString()) + .map(it -> Paths.get(sourceFolder.toURI()).resolve(it)) + .map(StreamingPart::new); + + return StreamResult.success(stream); + } + + @NotNull + private Stream openRecordsStream(WatchService watchService) { + return stream(new WatchKeyAbstractSpliterator(watchService), false); + } + + @Override + public void close() throws IOException { + watchService.close(); + } + + private record StreamingPart(Path path) implements Part { + + @Override + public String name() { + return path.toString(); + } + + @Override + public InputStream openStream() { + try { + return new FileInputStream(path.toFile()); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + } + } + + private static class WatchKeyAbstractSpliterator extends Spliterators.AbstractSpliterator { + + private final WatchService watchService; + + WatchKeyAbstractSpliterator(WatchService watchService) { + super(Long.MAX_VALUE, Spliterator.ORDERED); + this.watchService = watchService; + } + + @Override + public boolean tryAdvance(Consumer action) { + var poll = watchService.poll(); + while (poll == null) { + try { + poll = watchService.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + action.accept(poll); + poll.reset(); + return true; + } + } +} diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java new file mode 100644 index 00000000..45c2af8c --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming.http; + +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.jetbrains.annotations.NotNull; + +import java.io.File; +import java.util.Optional; + +/** + * Handles `HttpStreaming` sources, validating the address and creating the specific {@link HttpStreamingDataSource} + * instance on every {@link TransferProcess} started. + */ +public class HttpStreamingDataSourceFactory implements DataSourceFactory { + + @Override + public boolean canHandle(DataFlowRequest request) { + return request.getSourceDataAddress().getType().equals("HttpStreaming"); + } + + @Override + public DataSource createSource(DataFlowRequest request) { + return new HttpStreamingDataSource(sourceFolder(request).get()); + } + + @Override + public @NotNull Result validateRequest(DataFlowRequest request) { + return sourceFolder(request) + .map(it -> Result.success()) + .orElseGet(() -> Result.failure("sourceFolder is not found or it does not exist")); + } + + private Optional sourceFolder(DataFlowRequest request) { + return Optional.of(request) + .map(DataFlowRequest::getSourceDataAddress) + .map(it -> it.getStringProperty("sourceFolder")) + .map(File::new) + .filter(File::exists); + } + +} diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingExtension.java b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingExtension.java new file mode 100644 index 00000000..cddbc761 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingExtension.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming.http; + +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; + +/** + * Register the {@link HttpStreamingDataSourceFactory} to the data plane pipeline service + */ +public class HttpStreamingExtension implements ServiceExtension { + + @Override + public String name() { + return "Http Streaming"; + } + + @Inject + private PipelineService pipelineService; + + @Override + public void initialize(ServiceExtensionContext context) { + pipelineService.registerFactory(new HttpStreamingDataSourceFactory()); + } +} diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000..ae563032 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1 @@ +org.eclipse.edc.samples.transfer.streaming.http.HttpStreamingExtension diff --git a/transfer/streaming/streaming-01-http-to-http/transfer.json b/transfer/streaming/streaming-01-http-to-http/transfer.json new file mode 100644 index 00000000..fc7e3937 --- /dev/null +++ b/transfer/streaming/streaming-01-http-to-http/transfer.json @@ -0,0 +1,15 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@type": "TransferRequest", + "dataDestination": { + "type": "HttpData", + "baseUrl": "http://localhost:4000" + }, + "protocol": "dataspace-protocol-http", + "assetId": "stream-asset", + "contractId": "{{contract-agreement-id}}", + "connectorId": "provider", + "connectorAddress": "http://localhost:18182/protocol" +} diff --git a/transfer/transfer-01-file-transfer/file-transfer-consumer/build.gradle.kts b/transfer/transfer-01-file-transfer/file-transfer-consumer/build.gradle.kts index f4c174b2..5a54e632 100644 --- a/transfer/transfer-01-file-transfer/file-transfer-consumer/build.gradle.kts +++ b/transfer/transfer-01-file-transfer/file-transfer-consumer/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-01-file-transfer/file-transfer-provider/build.gradle.kts b/transfer/transfer-01-file-transfer/file-transfer-provider/build.gradle.kts index 8f197caa..1b6a2b45 100644 --- a/transfer/transfer-01-file-transfer/file-transfer-provider/build.gradle.kts +++ b/transfer/transfer-01-file-transfer/file-transfer-provider/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-01-file-transfer/filetransfer.json b/transfer/transfer-01-file-transfer/filetransfer.json index ebd6a8a0..035bbb49 100644 --- a/transfer/transfer-01-file-transfer/filetransfer.json +++ b/transfer/transfer-01-file-transfer/filetransfer.json @@ -13,6 +13,5 @@ "contractId": "{agreement ID}", "connectorId": "provider", "connectorAddress": "http://localhost:8282/protocol", - "privateProperties": {}, - "managedResources": false + "privateProperties": {} } diff --git a/transfer/transfer-02-file-transfer-listener/file-transfer-listener-consumer/build.gradle.kts b/transfer/transfer-02-file-transfer-listener/file-transfer-listener-consumer/build.gradle.kts index 77edb522..e8e9e82c 100644 --- a/transfer/transfer-02-file-transfer-listener/file-transfer-listener-consumer/build.gradle.kts +++ b/transfer/transfer-02-file-transfer-listener/file-transfer-listener-consumer/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-03-modify-transferprocess/modify-transferprocess-consumer/build.gradle.kts b/transfer/transfer-03-modify-transferprocess/modify-transferprocess-consumer/build.gradle.kts index 2675d51f..ec9f22e7 100644 --- a/transfer/transfer-03-modify-transferprocess/modify-transferprocess-consumer/build.gradle.kts +++ b/transfer/transfer-03-modify-transferprocess/modify-transferprocess-consumer/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-04-open-telemetry/filetransfer.json b/transfer/transfer-04-open-telemetry/filetransfer.json index 777e3781..3a2bfba1 100644 --- a/transfer/transfer-04-open-telemetry/filetransfer.json +++ b/transfer/transfer-04-open-telemetry/filetransfer.json @@ -11,7 +11,6 @@ "contentType": "application/octet-stream", "isFinite": true }, - "managedResources": false, "connectorAddress": "http://provider:8282/protocol", "connectorId": "consumer" } diff --git a/transfer/transfer-04-open-telemetry/open-telemetry-consumer/build.gradle.kts b/transfer/transfer-04-open-telemetry/open-telemetry-consumer/build.gradle.kts index bd4cc160..709166fb 100644 --- a/transfer/transfer-04-open-telemetry/open-telemetry-consumer/build.gradle.kts +++ b/transfer/transfer-04-open-telemetry/open-telemetry-consumer/build.gradle.kts @@ -16,7 +16,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-04-open-telemetry/open-telemetry-provider/build.gradle.kts b/transfer/transfer-04-open-telemetry/open-telemetry-provider/build.gradle.kts index 9e52967f..7c5dcee7 100644 --- a/transfer/transfer-04-open-telemetry/open-telemetry-provider/build.gradle.kts +++ b/transfer/transfer-04-open-telemetry/open-telemetry-provider/build.gradle.kts @@ -15,7 +15,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-05-file-transfer-cloud/README.md b/transfer/transfer-05-file-transfer-cloud/README.md index 172a9efd..8171d48c 100644 --- a/transfer/transfer-05-file-transfer-cloud/README.md +++ b/transfer/transfer-05-file-transfer-cloud/README.md @@ -162,7 +162,6 @@ curl --location --request POST 'http://localhost:9192/management/v2/transferproc "region": "us-east-1", "bucketName": "" }, - "managedResources": true, "transferType": { "contentType": "application/octet-stream", "isFinite": true diff --git a/transfer/transfer-05-file-transfer-cloud/cloud-transfer-consumer/build.gradle.kts b/transfer/transfer-05-file-transfer-cloud/cloud-transfer-consumer/build.gradle.kts index 9cb472bc..a5cf44b2 100644 --- a/transfer/transfer-05-file-transfer-cloud/cloud-transfer-consumer/build.gradle.kts +++ b/transfer/transfer-05-file-transfer-cloud/cloud-transfer-consumer/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-05-file-transfer-cloud/cloud-transfer-provider/build.gradle.kts b/transfer/transfer-05-file-transfer-cloud/cloud-transfer-provider/build.gradle.kts index 5fb91dc6..b1a9847b 100644 --- a/transfer/transfer-05-file-transfer-cloud/cloud-transfer-provider/build.gradle.kts +++ b/transfer/transfer-05-file-transfer-cloud/cloud-transfer-provider/build.gradle.kts @@ -17,7 +17,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-06-consumer-pull-http/README.md b/transfer/transfer-06-consumer-pull-http/README.md index 9d8dad53..f7ad87fc 100644 --- a/transfer/transfer-06-consumer-pull-http/README.md +++ b/transfer/transfer-06-consumer-pull-http/README.md @@ -138,7 +138,7 @@ curl -H 'Content-Type: application/json' \ "https://w3id.org/edc/v0.0.1/ns/publicApiUrl": "http://localhost:19291/public/" } }' \ - -X POST "http://localhost:19193/management/v2/dataplanes" + -X POST "http://localhost:19193/management/v2/dataplanes" | -s | jq ``` ### 2. Register data plane instance for consumer @@ -427,11 +427,12 @@ Sample output: ### 9. Start the transfer -As a pre-requisite, you need to have a backend service that runs on port 4000 +As a pre-requisite, you need to have an http server that runs on port 4000 and logs all the incoming requests, it will +be mandatory to get the EndpointDataReference that will be used to get the data. ```bash -./gradlew transfer:transfer-06-consumer-pull-http:consumer-pull-backend-service:build -java -jar transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/build/libs/consumer-pull-backend-service.jar +./gradlew util:http-request-logger:build +HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar ``` Now that we have a contract agreement, we can finally request the file. In the request body, we need @@ -455,7 +456,6 @@ curl -X POST "http://localhost:29193/management/v2/transferprocesses" \ "connectorAddress": "http://localhost:19194/protocol", "contractId": "", "assetId": "assetId", - "managedResources": false, "protocol": "dataspace-protocol-http", "dataDestination": { "type": "HttpProxy" @@ -503,10 +503,8 @@ You should see the Transfer Process in `COMPLETED` state: ### 11. Pull the data -At this step, if you look at the backend service logs, you will have a json representing -the data useful for reading the data. This is presented in the following section. - -Sample log for the Backend Service: +At this step, if you look at the http server logs, you will find a json representing the EndpointDataReference, needed +to get the data from the provider: ```json { @@ -524,8 +522,7 @@ Once this json is read, use a tool like postman or curl to execute the following data ```bash -curl --location --request GET 'http://localhost:29291/public/' \ ---header 'Authorization: ' +curl --location --request GET 'http://localhost:29291/public/' --header 'Authorization: ' ``` At the end, and to be sure that you correctly achieved the pull, you can check if the data you get @@ -535,8 +532,7 @@ is the same as the one you can get at https://jsonplaceholder.typicode.com/users Since we configured the `HttpData` with `proxyPath`, we could also ask for a specific user with: ```bash -curl --location --request GET 'http://localhost:29291/public/1' \ ---header 'Authorization: ' +curl --location --request GET 'http://localhost:29291/public/1' --header 'Authorization: ' ``` And the data returned will be the same as in https://jsonplaceholder.typicode.com/users/1 diff --git a/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/BackendService.java b/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/BackendService.java deleted file mode 100644 index e9aabb6b..00000000 --- a/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/BackendService.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc; - -import com.sun.net.httpserver.HttpServer; -import org.eclipse.edc.handler.ReceiverHandler; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Optional; - -public class BackendService { - - static final String HTTP_PORT = "server.port"; - - public static void main(String[] args) { - int port = Integer.parseInt(Optional.ofNullable(System.getenv(HTTP_PORT)).orElse("4000")); - var server = createHttpServer(port); - server.createContext("/receiver", new ReceiverHandler()); - server.setExecutor(null); - server.start(); - System.out.println("server started at " + port); - } - - private static HttpServer createHttpServer(int port) { - try { - return HttpServer.create(new InetSocketAddress(port), 0); - } catch (IOException e) { - throw new RuntimeException("Unable to start server at port " + port, e); - } - } -} diff --git a/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java b/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java deleted file mode 100644 index 4a5499fa..00000000 --- a/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.handler; - -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; - -import java.io.IOException; - -public class ReceiverHandler implements HttpHandler { - - /** - * This method just prints the request body to the console and returns a 200 OK response. - */ - @Override - public void handle(HttpExchange exchange) throws IOException { - System.out.println("Request Body: " + new String(exchange.getRequestBody().readAllBytes())); - exchange.sendResponseHeaders(200, 0); - } -} diff --git a/transfer/transfer-06-consumer-pull-http/http-pull-connector/build.gradle.kts b/transfer/transfer-06-consumer-pull-http/http-pull-connector/build.gradle.kts index d75a45f7..a16e0935 100644 --- a/transfer/transfer-06-consumer-pull-http/http-pull-connector/build.gradle.kts +++ b/transfer/transfer-06-consumer-pull-http/http-pull-connector/build.gradle.kts @@ -15,7 +15,7 @@ plugins { `java-library` id("application") - id("com.github.johnrengelman.shadow") version "8.1.1" + alias(libs.plugins.shadow) } dependencies { diff --git a/transfer/transfer-07-provider-push-http/README.md b/transfer/transfer-07-provider-push-http/README.md index d0dd8d10..4545dd73 100644 --- a/transfer/transfer-07-provider-push-http/README.md +++ b/transfer/transfer-07-provider-push-http/README.md @@ -377,11 +377,12 @@ Sample output: ### 8. Start the transfer -As a pre-requisite, you need to have a backend service that runs on port 4000 +As a pre-requisite, you need to have an http server that runs on port 4000 and logs all the incoming requests, it will +be mandatory to get the data from the provider. ```bash -./gradlew transfer:transfer-07-provider-push-http:provider-push-http-backend-service:build -java -jar transfer/transfer-07-provider-push-http/provider-push-http-backend-service/build/libs/provider-push-http-backend-service.jar +./gradlew util:http-request-logger:build +HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar ``` Now that we have a contract agreement, we can finally request the file. In the request body, we need @@ -404,7 +405,6 @@ curl -X POST "http://localhost:29193/management/v2/transferprocesses" \ "connectorAddress": "http://localhost:19194/protocol", "contractId": "", "assetId": "assetId", - "managedResources": false, "protocol": "dataspace-protocol-http", "dataDestination": { "type": "HttpData", @@ -441,6 +441,5 @@ curl http://localhost:29193/management/v2/transferprocesses/ { - manifest { - attributes["Main-Class"] = "org.eclipse.edc.BackendService" - } -} \ No newline at end of file diff --git a/transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/BackendService.java b/transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/BackendService.java deleted file mode 100644 index c666059b..00000000 --- a/transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/BackendService.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc; - -import com.sun.net.httpserver.HttpServer; -import org.eclipse.edc.handler.ReceiverHandler; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Optional; - -public class BackendService { - - static final String HTTP_PORT = "server.port"; - - public static void main(String[] args) { - int port = Integer.parseInt(Optional.ofNullable(System.getenv(HTTP_PORT)).orElse("4000")); - var server = createHttpServer(port); - server.createContext("/api/consumer/store", new ReceiverHandler()); - server.setExecutor(null); - server.start(); - System.out.println("server started at " + port); - } - - private static HttpServer createHttpServer(int port) { - try { - return HttpServer.create(new InetSocketAddress(port), 0); - } catch (IOException e) { - throw new RuntimeException("Unable to start server at port " + port, e); - } - } -} diff --git a/transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java b/transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java deleted file mode 100644 index 4a5499fa..00000000 --- a/transfer/transfer-07-provider-push-http/provider-push-http-backend-service/src/main/java/org/eclipse/edc/handler/ReceiverHandler.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.handler; - -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; - -import java.io.IOException; - -public class ReceiverHandler implements HttpHandler { - - /** - * This method just prints the request body to the console and returns a 200 OK response. - */ - @Override - public void handle(HttpExchange exchange) throws IOException { - System.out.println("Request Body: " + new String(exchange.getRequestBody().readAllBytes())); - exchange.sendResponseHeaders(200, 0); - } -} diff --git a/util/README.md b/util/README.md new file mode 100644 index 00000000..65105c04 --- /dev/null +++ b/util/README.md @@ -0,0 +1,3 @@ +# Util + +Here there are modules that are used by different samples diff --git a/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/build.gradle.kts b/util/http-request-logger/build.gradle.kts similarity index 74% rename from transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/build.gradle.kts rename to util/http-request-logger/build.gradle.kts index 76eeccf6..166e7896 100644 --- a/transfer/transfer-06-consumer-pull-http/consumer-pull-backend-service/build.gradle.kts +++ b/util/http-request-logger/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -18,6 +18,6 @@ plugins { tasks.withType { manifest { - attributes["Main-Class"] = "org.eclipse.edc.BackendService" + attributes["Main-Class"] = "org.eclipse.edc.samples.util.HttpRequestLoggerServer" } -} \ No newline at end of file +} diff --git a/util/http-request-logger/src/main/java/org/eclipse/edc/samples/util/HttpRequestLoggerServer.java b/util/http-request-logger/src/main/java/org/eclipse/edc/samples/util/HttpRequestLoggerServer.java new file mode 100644 index 00000000..1c4b4de9 --- /dev/null +++ b/util/http-request-logger/src/main/java/org/eclipse/edc/samples/util/HttpRequestLoggerServer.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.samples.util; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Optional; + +public class HttpRequestLoggerServer { + + static final String HTTP_PORT = "HTTP_SERVER_PORT"; + + public static void main(String[] args) { + int port = Integer.parseInt(Optional.ofNullable(System.getenv(HTTP_PORT)).orElse("4000")); + try { + var server = HttpServer.create(new InetSocketAddress(port), 0); + server.createContext("/", new ReceiverHandler()); + server.setExecutor(null); + server.start(); + System.out.println("HTTP request server logger started at " + port); + } catch (IOException e) { + throw new RuntimeException("Unable to start server at port " + port, e); + } + } + + private static class ReceiverHandler implements HttpHandler { + + @Override + public void handle(HttpExchange exchange) throws IOException { + System.out.println("Incoming request"); + System.out.println("Method: " + exchange.getRequestMethod()); + System.out.println("Path: " + exchange.getRequestURI()); + System.out.println("Body:"); + System.out.println(new String(exchange.getRequestBody().readAllBytes())); + System.out.println("============="); + exchange.sendResponseHeaders(200, 0); + } + } + +}