diff --git a/advanced/advanced-01-open-telemetry/README.md b/advanced/advanced-01-open-telemetry/README.md index a88d08fa..0ddc29b3 100644 --- a/advanced/advanced-01-open-telemetry/README.md +++ b/advanced/advanced-01-open-telemetry/README.md @@ -26,7 +26,7 @@ is configured to expose a Prometheus metrics endpoint. To run the consumer, the provider, and Jaeger execute the following commands in the project root folder: ```bash -docker-compose -f advanced/advanced-01-open-telemetry/docker-compose.yaml up --abort-on-container-exit +docker compose -f advanced/advanced-01-open-telemetry/docker-compose.yaml up --abort-on-container-exit ``` Open a new terminal. @@ -54,7 +54,7 @@ Create an asset: ```bash curl -H "X-Api-Key: password" \ -d @transfer/transfer-01-negotiation/resources/create-asset.json \ - -H 'content-type: application/json' http://localhost:19193/management/v2/assets \ + -H 'content-type: application/json' http://localhost:19193/management/v3/assets \ -s | jq ``` @@ -76,7 +76,63 @@ curl -H "X-Api-Key: password" \ -s | jq ``` -Start a contract negotiation: +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a +contract. However, in this sample case, we already have the provider asset `assetId` so we can get the related dataset +directly with this call: + +```shell +curl -H "X-Api-Key: password" \ + -H "Content-Type: application/json" \ + -d @advanced/advanced-01-open-telemetry/resources/get-dataset.json \ + -X POST "http://localhost:29193/management/v2/catalog/dataset/request" \ + -s | jq +``` + +The output will be something like: + +```json +{ + "@id": "assetId", + "@type": "dcat:Dataset", + "odrl:hasPolicy": { + "@id": "MQ==:YXNzZXRJZA==:YjI5ZDVkZDUtZWU0Mi00NWRiLWE2OTktYjNmMjlmMWNjODk3", + "@type": "odrl:Set", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "assetId" + }, + "dcat:distribution": [ + { + "@type": "dcat:Distribution", + "dct:format": { + "@id": "HttpProxy" + }, + "dcat:accessService": "06348bca-6bf0-47fe-8bb5-6741cff7a955" + }, + { + "@type": "dcat:Distribution", + "dct:format": { + "@id": "HttpData" + }, + "dcat:accessService": "06348bca-6bf0-47fe-8bb5-6741cff7a955" + } + ], + "edc:name": "product description", + "edc:id": "assetId", + "edc:contenttype": "application/json", + "@context": { + "dct": "https://purl.org/dc/terms/", + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "dcat": "https://www.w3.org/ns/dcat/", + "odrl": "http://www.w3.org/ns/odrl/2/", + "dspace": "https://w3id.org/dspace/v0.8/" + } +} +``` + +With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](resources/negotiate-contract.json) file +and request the contract negotiation: ```bash curl -H "X-Api-Key: password" \ @@ -86,20 +142,23 @@ curl -H "X-Api-Key: password" \ -s | jq ``` -Wait until the negotiation is in `FINALIZED` state and call +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation +state with this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. -```bash -curl -X GET -H 'X-Api-Key: password' "http://localhost:29193/management/v2/contractnegotiations/{UUID}" +```shell +curl -H 'X-Api-Key: password' \ + -X GET "http://localhost:29193/management/v2/contractnegotiations/{{contract-negotiation-id}}" \ + -s | jq ``` -to get the contract agreement id. -Finally, update the contract agreement id in the [request body](resources/start-transfer.json) and execute a file transfer with the following command: +Finally, update the contract agreement id in the [start-transfer.json](resources/start-transfer.json) and execute a file transfer with the following command: ```bash curl -H "X-Api-Key: password" \ -H "Content-Type: application/json" \ -d @advanced/advanced-01-open-telemetry/resources/start-transfer.json \ - -X POST "http://localhost:29193/management/v2/transferprocesses" + -X POST "http://localhost:29193/management/v2/transferprocesses" \ + -s | jq ``` You can access the Jaeger UI on your browser at `http://localhost:16686`. In the search tool, we can select the service diff --git a/advanced/advanced-01-open-telemetry/resources/get-dataset.json b/advanced/advanced-01-open-telemetry/resources/get-dataset.json new file mode 100644 index 00000000..217aa29a --- /dev/null +++ b/advanced/advanced-01-open-telemetry/resources/get-dataset.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@type": "DatasetRequest", + "@id": "assetId", + "counterPartyAddress": "http://provider:19194/protocol", + "protocol": "dataspace-protocol-http" +} \ No newline at end of file diff --git a/advanced/advanced-01-open-telemetry/resources/negotiate-contract.json b/advanced/advanced-01-open-telemetry/resources/negotiate-contract.json index 2e6a36b4..b16050ac 100644 --- a/advanced/advanced-01-open-telemetry/resources/negotiate-contract.json +++ b/advanced/advanced-01-open-telemetry/resources/negotiate-contract.json @@ -11,7 +11,7 @@ "protocol": "dataspace-protocol-http", "policy": { "@context": "http://www.w3.org/ns/odrl.jsonld", - "@id": "MQ==:YXNzZXRJZA==:YTc4OGEwYjMtODRlZi00NWYwLTgwOWQtMGZjZTMwMGM3Y2Ey", + "@id": "{{contract-negotiation-id}}", "@type": "Set", "permission": [], "prohibition": [], diff --git a/advanced/advanced-01-open-telemetry/resources/start-transfer.json b/advanced/advanced-01-open-telemetry/resources/start-transfer.json index 354c41f2..171cc529 100644 --- a/advanced/advanced-01-open-telemetry/resources/start-transfer.json +++ b/advanced/advanced-01-open-telemetry/resources/start-transfer.json @@ -5,7 +5,7 @@ "@type": "TransferRequestDto", "connectorId": "provider", "counterPartyAddress": "http://provider:19194/protocol", - "contractId": "", + "contractId": "{{contract-agreement-id}}", "assetId": "assetId", "protocol": "dataspace-protocol-http", "dataDestination": { diff --git a/basic/basic-02-health-endpoint/README.md b/basic/basic-02-health-endpoint/README.md index 2eb8c71d..7691c7e7 100644 --- a/basic/basic-02-health-endpoint/README.md +++ b/basic/basic-02-health-endpoint/README.md @@ -7,7 +7,7 @@ An _extension_ typically consists of two things: 1. a class implementing the `ServiceExtension` interface. 2. a plugin file in the `src/main/resources/META-INF/services` directory. This file **must** be named exactly as the - interface's fully qualified class-name and it **must** contain the fully-qualified name of the implementing class ( + interface's fully qualified class name, and it **must** contain the fully-qualified name of the implementing class ( =plugin class). Therefore, we require an extension class, which we'll name `HealthEndpointExtension`: @@ -66,8 +66,7 @@ and can be configured using the `web.http.port` property (more on that in the ne this whenever you have two connectors running on the same machine. Also, the default path is `/api/*`, which is defined in -[`JettyConfiguration.java`](https://github.com/eclipse-edc/Connector/blob/releases/extensions/common/http/jetty-core/src/main/java/org/eclipse/edc/web/jetty/JettyConfiguration.java) -. +[`JettyConfiguration.java`](https://github.com/eclipse-edc/Connector/blob/releases/extensions/common/http/jetty-core/src/main/java/org/eclipse/edc/web/jetty/JettyConfiguration.java). --- diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java index 8a4ae1ab..d13488dc 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java @@ -9,6 +9,7 @@ * * Contributors: * Mercedes-Benz Tech Innovation GmbH - Sample workflow test + * Fraunhofer Institute for Software and Systems Engineering - use current ids instead of placeholder * */ @@ -19,7 +20,7 @@ import org.eclipse.edc.junit.annotations.EndToEndTest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.ComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -35,6 +36,7 @@ import static org.eclipse.edc.samples.common.NegotiationCommon.createAsset; import static org.eclipse.edc.samples.common.NegotiationCommon.createContractDefinition; import static org.eclipse.edc.samples.common.NegotiationCommon.createPolicy; +import static org.eclipse.edc.samples.common.NegotiationCommon.fetchDatasetFromCatalog; import static org.eclipse.edc.samples.common.NegotiationCommon.getContractAgreementId; import static org.eclipse.edc.samples.common.NegotiationCommon.negotiateContract; import static org.eclipse.edc.samples.common.PrerequisitesCommon.runPrerequisites; @@ -46,13 +48,14 @@ public class Advanced01openTelemetryTest { private static final String DOCKER_COMPOSE_YAML = "advanced/advanced-01-open-telemetry/docker-compose.yaml"; + private static final String FETCH_DATASET_FROM_CATALOG_FILE_PATH = "advanced/advanced-01-open-telemetry/resources/get-dataset.json"; private static final String NEGOTIATE_CONTRACT_FILE_PATH = "advanced/advanced-01-open-telemetry/resources/negotiate-contract.json"; private static final String START_TRANSFER_FILE_PATH = "advanced/advanced-01-open-telemetry/resources/start-transfer.json"; private static final String JAEGER_URL = "http://localhost:16686"; @Container - public static DockerComposeContainer environment = - new DockerComposeContainer<>(getFileFromRelativePath(DOCKER_COMPOSE_YAML)) + public static ComposeContainer environment = + new ComposeContainer(getFileFromRelativePath(DOCKER_COMPOSE_YAML)) .withLocalCompose(true) .waitingFor("consumer", Wait.forLogMessage(".*ready.*", 1)); @@ -67,7 +70,8 @@ void runSampleSteps() { createAsset(); createPolicy(); createContractDefinition(); - var contractNegotiationId = negotiateContract(NEGOTIATE_CONTRACT_FILE_PATH); + var catalogDatasetId = fetchDatasetFromCatalog(FETCH_DATASET_FROM_CATALOG_FILE_PATH); + var contractNegotiationId = negotiateContract(NEGOTIATE_CONTRACT_FILE_PATH, catalogDatasetId); var contractAgreementId = getContractAgreementId(contractNegotiationId); var transferProcessId = startTransfer(getFileContentFromRelativePath(START_TRANSFER_FILE_PATH), contractAgreementId); checkTransferStatus(transferProcessId, TransferProcessStates.STARTED); diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/common/NegotiationCommon.java b/system-tests/src/test/java/org/eclipse/edc/samples/common/NegotiationCommon.java index 11842c3b..a64c288b 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/common/NegotiationCommon.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/common/NegotiationCommon.java @@ -9,6 +9,7 @@ * * Contributors: * Mercedes-Benz Tech Innovation GmbH - Initial implementation + * Fraunhofer Institute for Software and Systems Engineering - use current ids instead of placeholder * */ @@ -33,12 +34,14 @@ public class NegotiationCommon { private static final String V2_POLICY_DEFINITIONS_PATH = "/v2/policydefinitions"; private static final String CREATE_CONTRACT_DEFINITION_FILE_PATH = "transfer/transfer-01-negotiation/resources/create-contract-definition.json"; private static final String V2_CONTRACT_DEFINITIONS_PATH = "/v2/contractdefinitions"; - private static final String FETCH_CATALOG_FILE_PATH = "transfer/transfer-01-negotiation/resources/fetch-catalog.json"; - private static final String V2_CATALOG_REQUEST_PATH = "/v2/catalog/request"; + private static final String V2_CATALOG_DATASET_REQUEST_PATH = "/v2/catalog/dataset/request"; + private static final String FETCH_DATASET_FROM_CATALOG_FILE_PATH = "transfer/transfer-01-negotiation/resources/get-dataset.json"; + private static final String CATALOG_DATASET_ID = "\"odrl:hasPolicy\".'@id'"; private static final String NEGOTIATE_CONTRACT_FILE_PATH = "transfer/transfer-01-negotiation/resources/negotiate-contract.json"; private static final String V2_CONTRACT_NEGOTIATIONS_PATH = "/v2/contractnegotiations/"; private static final String CONTRACT_NEGOTIATION_ID = "@id"; private static final String CONTRACT_AGREEMENT_ID = "contractAgreementId"; + private static final String CONTRACT_NEGOTIATION_ID_KEY = "\\{\\{contract-negotiation-id\\}\\}"; public static void createAsset() { post(PrerequisitesCommon.PROVIDER_MANAGEMENT_URL + V3_ASSETS_PATH, getFileContentFromRelativePath(CREATE_ASSET_FILE_PATH)); @@ -52,20 +55,28 @@ public static void createContractDefinition() { post(PrerequisitesCommon.PROVIDER_MANAGEMENT_URL + V2_CONTRACT_DEFINITIONS_PATH, getFileContentFromRelativePath(CREATE_CONTRACT_DEFINITION_FILE_PATH)); } - public static void fetchCatalog() { - post(PrerequisitesCommon.CONSUMER_MANAGEMENT_URL + V2_CATALOG_REQUEST_PATH, getFileContentFromRelativePath(FETCH_CATALOG_FILE_PATH)); + public static String fetchDatasetFromCatalog(String fetchDatasetFromCatalogFilePath) { + var catalogDatasetId = post( + PrerequisitesCommon.CONSUMER_MANAGEMENT_URL + V2_CATALOG_DATASET_REQUEST_PATH, + getFileContentFromRelativePath(fetchDatasetFromCatalogFilePath), + CATALOG_DATASET_ID + ); + assertThat(catalogDatasetId).isNotEmpty(); + return catalogDatasetId; } - public static String negotiateContract(String negotiateContractFilePath) { - var contractNegotiationId = post(PrerequisitesCommon.CONSUMER_MANAGEMENT_URL + V2_CONTRACT_NEGOTIATIONS_PATH, getFileContentFromRelativePath(negotiateContractFilePath), CONTRACT_NEGOTIATION_ID); + public static String negotiateContract(String negotiateContractFilePath, String catalogDatasetId) { + var requestBody = getFileContentFromRelativePath(negotiateContractFilePath) + .replaceAll(CONTRACT_NEGOTIATION_ID_KEY, catalogDatasetId); + var contractNegotiationId = post( + PrerequisitesCommon.CONSUMER_MANAGEMENT_URL + V2_CONTRACT_NEGOTIATIONS_PATH, + requestBody, + CONTRACT_NEGOTIATION_ID + ); assertThat(contractNegotiationId).isNotEmpty(); return contractNegotiationId; } - public static String negotiateContract() { - return negotiateContract(NEGOTIATE_CONTRACT_FILE_PATH); - } - public static String getContractAgreementId(String contractNegotiationId) { String url = PrerequisitesCommon.CONSUMER_MANAGEMENT_URL + V2_CONTRACT_NEGOTIATIONS_PATH + contractNegotiationId; return await() @@ -78,8 +89,8 @@ public static String runNegotiation() { createAsset(); createPolicy(); createContractDefinition(); - fetchCatalog(); - var contractNegotiationId = negotiateContract(); + var catalogDatasetId = fetchDatasetFromCatalog(FETCH_DATASET_FROM_CATALOG_FILE_PATH); + var contractNegotiationId = negotiateContract(NEGOTIATE_CONTRACT_FILE_PATH, catalogDatasetId); return getContractAgreementId(contractNegotiationId); } } diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer01negotiationTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer01negotiationTest.java index b9b10f15..2e3bce76 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer01negotiationTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer01negotiationTest.java @@ -9,6 +9,7 @@ * * Contributors: * Mercedes-Benz Tech Innovation GmbH - Sample workflow test + * Fraunhofer Institute for Software and Systems Engineering - use current ids instead of placeholder * */ @@ -23,7 +24,7 @@ import static org.eclipse.edc.samples.common.NegotiationCommon.createAsset; import static org.eclipse.edc.samples.common.NegotiationCommon.createContractDefinition; import static org.eclipse.edc.samples.common.NegotiationCommon.createPolicy; -import static org.eclipse.edc.samples.common.NegotiationCommon.fetchCatalog; +import static org.eclipse.edc.samples.common.NegotiationCommon.fetchDatasetFromCatalog; import static org.eclipse.edc.samples.common.NegotiationCommon.getContractAgreementId; import static org.eclipse.edc.samples.common.NegotiationCommon.negotiateContract; import static org.eclipse.edc.samples.common.PrerequisitesCommon.getConsumer; @@ -39,14 +40,17 @@ public class Transfer01negotiationTest { @RegisterExtension static EdcRuntimeExtension consumer = getConsumer(); + private static final String NEGOTIATE_CONTRACT_FILE_PATH = "transfer/transfer-01-negotiation/resources/negotiate-contract.json"; + private static final String FETCH_DATASET_FROM_CATALOG_FILE_PATH = "transfer/transfer-01-negotiation/resources/get-dataset.json"; + @Test void runSampleSteps() { runPrerequisites(); createAsset(); createPolicy(); createContractDefinition(); - fetchCatalog(); - var contractNegotiationId = negotiateContract(); + var catalogDatasetId = fetchDatasetFromCatalog(FETCH_DATASET_FROM_CATALOG_FILE_PATH); + var contractNegotiationId = negotiateContract(NEGOTIATE_CONTRACT_FILE_PATH, catalogDatasetId); var contractAgreementId = getContractAgreementId(contractNegotiationId); assertThat(contractAgreementId).isNotEmpty(); } diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java b/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java index 3e1b4204..01971112 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java @@ -9,6 +9,7 @@ * * Contributors: * Mercedes-Benz Tech Innovation GmbH - Initial implementation + * Fraunhofer Institute for Software and Systems Engineering - use current ids instead of placeholder * */ @@ -35,7 +36,7 @@ public class TransferUtil { public static final Duration POLL_INTERVAL = Duration.ofMillis(500); private static final String TRANSFER_PROCESS_ID = "@id"; - private static final String CONTRACT_AGREEMENT_ID_KEY = ""; + private static final String CONTRACT_AGREEMENT_ID_KEY = "\\{\\{contract-agreement-id\\}\\}"; private static final String V2_TRANSFER_PROCESSES_PATH = "/v2/transferprocesses/"; private static final String EDC_STATE = "state"; diff --git a/transfer/streaming/streaming-01-http-to-http/README.md b/transfer/streaming/streaming-01-http-to-http/README.md index e810cdd9..66331433 100644 --- a/transfer/streaming/streaming-01-http-to-http/README.md +++ b/transfer/streaming/streaming-01-http-to-http/README.md @@ -1,6 +1,6 @@ # Streaming HTTP to HTTP -This sample will show how you can set up the EDC to stream messages from HTTP to HTTP. +This sample will show how you can set up the Eclipse Dataspace Connector to stream messages from HTTP to HTTP. This code is only for demonstration purposes and should not be used in production. ## Concept @@ -14,13 +14,14 @@ Build the connector runtime, which will be used both for the provider and consum ./gradlew :transfer:streaming:streaming-01-http-to-http:streaming-01-runtime:build ``` -Run the provider and the consumer, which must be started from different terminal shells: +Run the provider and the consumer with their own configuration, which will need to be started from different terminals: + ```shell -# provider export EDC_FS_CONFIG=transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build/libs/connector.jar +``` -#consumer +```shell export EDC_FS_CONFIG=transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build/libs/connector.jar ``` @@ -28,8 +29,8 @@ java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/buil #### Register Data Plane on provider The provider connector needs to be aware of the streaming capabilities of the embedded dataplane, which can be registered with this call: -```js -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" ``` If you look at the `dataplane.json` you'll notice that the supported source is `HttpStreaming` and the supported sink is `HttpData`. @@ -43,17 +44,25 @@ mkdir /tmp/source Then put the path in the [asset.json](asset.json) file replacing the `{{sourceFolder}}` placeholder. ```json +{ "dataAddress": { "type": "HttpStreaming", - "sourceFolder": "{{sourceFolder}}" + "sourceFolder": "/tmp/source" } +} +``` + +Then use these three calls to create the Asset, the Policy Definition and the Contract Definition: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/asset.json -X POST "http://localhost:18181/management/v3/assets" +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" ``` -Then create the Asset, the Policy Definition and the Contract Definition with these three calls: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/asset.json -X POST "http://localhost:18181/management/v3/assets" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" ``` #### Negotiate the contract @@ -61,7 +70,7 @@ The typical flow requires fetching the catalog from the consumer side and using However, in this sample case, we already have the provider asset (`"stream-asset"`) so we can get the related dataset directly with this call: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq ``` The output will be something like: @@ -102,7 +111,7 @@ curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-htt ``` ### Start the transfer -First we need to set up the receiver server on the consumer side that will receive a call for every message. For this +First we need to set up the logging webserver on the consumer side that will receive a call for every message. For this you'll need to open another terminal shell and run: ```shell ./gradlew util:http-request-logger:build @@ -119,9 +128,9 @@ curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negot If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [transfer.json](transfer.json) file to `{{contract-agreement-id}}` and then calling the connector with this command: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq ``` -> Note that the destination address is `localhost:4000`, this because is where our http server is listening. +> Note that the destination address is `localhost:4000`, this because is where our logging webserver is listening. Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned @@ -142,8 +151,7 @@ Incoming request Method: POST Path: / Body: -# EDC Samples -... + ``` ### Up to you: second connector diff --git a/transfer/streaming/streaming-02-kafka-to-http/1-asset.json b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json index 675614c9..a4b7d626 100644 --- a/transfer/streaming/streaming-02-kafka-to-http/1-asset.json +++ b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json @@ -6,7 +6,6 @@ "dataAddress": { "type": "Kafka", "kafka.bootstrap.servers": "{{bootstrap.servers}}", - "maxDuration": "{{max.duration}}", "topic": "{{topic}}" } } diff --git a/transfer/streaming/streaming-02-kafka-to-http/README.md b/transfer/streaming/streaming-02-kafka-to-http/README.md index 888e98e2..c404fef6 100644 --- a/transfer/streaming/streaming-02-kafka-to-http/README.md +++ b/transfer/streaming/streaming-02-kafka-to-http/README.md @@ -15,13 +15,14 @@ Build the connector runtime, which will be used both for the provider and consum ./gradlew :transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime:build ``` -Run the provider and the consumer, which must be started from different terminal shells: +Run the provider and the consumer with their own configuration, which will need to be started from different terminals: + ```shell -# provider export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar +``` -#consumer +```shell export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar ``` @@ -31,7 +32,7 @@ java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/bui The provider connector needs to be aware of the kafka streaming capabilities of the embedded dataplane, which can be registered with this call: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" -s | jq ``` If you look at the `0-dataplane.json` you'll notice that the supported source is `Kafka` and the supported sink is `HttpData`. @@ -41,24 +42,32 @@ If you look at the `0-dataplane.json` you'll notice that the supported source is A "source" kafka topic must first be created where the data plane will get the event records to be sent to the consumers. To do this, initiate a Kafka server with the source topic: ```shell -docker run -e "KAFKA_CREATE_TOPICS={{topic}}:1:1" -p 9092:9092 -d bashj79/kafka-kraft:3.0.0 +docker run --rm --name=kafka-kraft -e "KAFKA_CREATE_TOPICS={{topic}}:1:1" -p 9092:9092 -d bashj79/kafka-kraft:3.0.0 ``` -Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing their placeholders. +Then put values of `kafka.bootstrap.servers` and `topic` in the [1-asset.json](1-asset.json) file replacing their placeholders. ```json +{ "dataAddress": { "type": "Kafka", - "kafka.bootstrap.servers": "{{bootstrap.servers}}", - "maxDuration": "{{max.duration}}" - "topic": "{{topic}}" + "kafka.bootstrap.servers": "localhost:9092", + "topic": "kafka-stream-topic" } +} +``` + +Then use these three calls to create the Asset, the Policy Definition and the Contract Definition: + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -s | jq +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -s | jq ``` -Then create the Asset, the Policy Definition and the Contract Definition with these three calls: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" -s | jq ``` ### Negotiate the contract @@ -67,7 +76,7 @@ The typical flow requires fetching the catalog from the consumer side and using However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset directly with this call: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq ``` The output will be something like: @@ -109,7 +118,7 @@ curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kaf ### Start the transfer -First we need to set up the receiver server on the consumer side that will receive a call for every new event. For this +First we need to set up the logging webserver on the consumer side that will receive a call for every new event. For this you'll need to open another terminal shell and run: ```shell ./gradlew util:http-request-logger:build @@ -126,9 +135,9 @@ curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negot If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) file to `{{contract-agreement-id}}` and then calling the connector with this command: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq ``` -> Note that the destination address is `localhost:4000`, this because is where our http server is listening. +> Note that the destination address is `localhost:4000`, this because is where our logging webserver is listening. Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned by the start transfer call: @@ -140,7 +149,7 @@ curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process- With the Kafka server running in Docker, you can use the Kafka command-line producer `kafka-console-producer.sh` to produce a message. In a new terminal shell, you'll need to execute: ```shell -docker exec -it {{docker-container-id}} /opt/kafka/bin/kafka-console-producer.sh --topic kafka-stream-topic --bootstrap-server localhost:9092 +docker exec -it kafka-kraft /opt/kafka/bin/kafka-console-producer.sh --topic kafka-stream-topic --bootstrap-server localhost:9092 ``` This command will open an interactive prompt for you to input your message. Once you've typed your message and pressed Enter, it will be produced, consumed and pushed to the receiver server. You should observe the content being logged on its terminal shell: @@ -150,5 +159,4 @@ Method: POST Path: / Body: -... ``` diff --git a/transfer/streaming/streaming-03-kafka-broker/README.md b/transfer/streaming/streaming-03-kafka-broker/README.md index 046c6c85..7273196a 100644 --- a/transfer/streaming/streaming-03-kafka-broker/README.md +++ b/transfer/streaming/streaming-03-kafka-broker/README.md @@ -1,11 +1,11 @@ # Streaming KAFKA to KAFKA -This sample demonstrates how to set up the EDC to stream messages through Kafka. +This sample demonstrates how to set up the Eclipse Dataspace Connector to stream messages through Kafka. This code is only for demonstration purposes and should not be used in production. ## Concept -In this sample the Data-Plane is not used, the consumer will set up a kafka client to poll the messages from the broker +In this sample the dataplane is not used, the consumer will set up a kafka client to poll the messages from the broker using some credentials obtained from the transfer process. The DataFlow is managed by the [KafkaToKafkaDataFlowController](streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java), @@ -19,12 +19,14 @@ Build the connector runtime, which will be used both for the provider and consum ./gradlew :transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime:build ``` -Run the provider and the consumer, which must be started from different terminal shells: +Run the provider and the consumer with their own configuration, which will need to be started from different terminals: + ```shell -# provider export EDC_FS_CONFIG=transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/provider.properties java -jar transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build/libs/connector.jar +``` +```shell #consumer export EDC_FS_CONFIG=transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/consumer.properties java -jar transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build/libs/connector.jar @@ -75,21 +77,29 @@ docker exec -it kafka-kraft /bin/kafka-acls --command-config /config/admin.prope ### Register Asset, Policy Definition and Contract Definition on provider -Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing +Then put values of `kafka.bootstrap.servers` and `topic` in the [1-asset.json](1-asset.json) file replacing their placeholders this way: ```json +{ "dataAddress": { "type": "Kafka", "kafka.bootstrap.servers": "localhost:9093", "topic": "kafka-stream-topic" } +} +``` + +Then use these three calls to create the Asset, the Policy Definition and the Contract Definition: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -s | jq +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -s | jq ``` -Then create the Asset, the Policy Definition and the Contract Definition with these three calls: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" -s | jq ``` ### Negotiate the contract @@ -98,7 +108,7 @@ The typical flow requires fetching the catalog from the consumer side and using However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset directly with this call: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq ``` The output will be something like: @@ -114,13 +124,7 @@ The output will be something like: "odrl:obligation": [], "odrl:target": "kafka-stream-asset" }, - "dcat:distribution": { - "@type": "dcat:Distribution", - "dct:format": { - "@id": "HttpData" - }, - "dcat:accessService": "b24dfdbc-d17f-4d6e-9b5c-8fa71dacecfc" - }, + "dcat:distribution": [], "edc:id": "kafka-stream-asset", "@context": { "dct": "https://purl.org/dc/terms/", @@ -140,7 +144,7 @@ curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kaf ### Start the transfer -First we need to set up the receiver server on the consumer side that will receive the EndpointDataReference containing +First we need to set up the logging webserver on the consumer side that will receive the EndpointDataReference containing the address and credentials to connect to the broker and poll the messages from the topic. For this you'll need to open another terminal shell and run: ```shell @@ -158,9 +162,9 @@ curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negot If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) file to `{{contract-agreement-id}}` and then calling the connector with this command: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq ``` -> Note that the destination address is `localhost:4000`, this because is where our http server is listening. +> Note that the destination address is `localhost:4000`, this because is where our logging webserver is listening. Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned by the start transfer call: @@ -173,11 +177,11 @@ Now in the console of the `http-request-logger` we started before, the `Endpoint ```json { "id":"8c52a781-2588-4c9b-8c70-4e5ad428eea9", - "endpoint":"localhost:9093", - "authKey":"alice", - "authCode":"alice-secret", + "endpoint": "localhost:9093", + "authKey": "alice", + "authCode": "alice-secret", "properties": { - "https://w3id.org/edc/v0.0.1/ns/topic":"kafka-stream-topic" + "https://w3id.org/edc/v0.0.1/ns/topic": "kafka-stream-topic" } } ``` diff --git a/transfer/transfer-01-negotiation/README.md b/transfer/transfer-01-negotiation/README.md index b653eedf..5d05e600 100644 --- a/transfer/transfer-01-negotiation/README.md +++ b/transfer/transfer-01-negotiation/README.md @@ -208,7 +208,7 @@ state, the negotiation is finished. We can now use the UUID to check the current negotiation using an endpoint on the consumer side. ```bash -curl -X GET "http://localhost:29193/management/v2/contractnegotiations/" \ +curl -X GET "http://localhost:29193/management/v2/contractnegotiations/{{contract-negotiation-id}}" \ --header 'Content-Type: application/json' \ -s | jq ``` diff --git a/transfer/transfer-01-negotiation/resources/get-dataset.json b/transfer/transfer-01-negotiation/resources/get-dataset.json new file mode 100644 index 00000000..5286af00 --- /dev/null +++ b/transfer/transfer-01-negotiation/resources/get-dataset.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@type": "DatasetRequest", + "@id": "assetId", + "counterPartyAddress": "http://localhost:19194/protocol", + "protocol": "dataspace-protocol-http" +} \ No newline at end of file diff --git a/transfer/transfer-01-negotiation/resources/negotiate-contract.json b/transfer/transfer-01-negotiation/resources/negotiate-contract.json index d7c34519..2464f0cc 100644 --- a/transfer/transfer-01-negotiation/resources/negotiate-contract.json +++ b/transfer/transfer-01-negotiation/resources/negotiate-contract.json @@ -10,7 +10,7 @@ "protocol": "dataspace-protocol-http", "policy": { "@context": "http://www.w3.org/ns/odrl.jsonld", - "@id": "MQ==:YXNzZXRJZA==:YTc4OGEwYjMtODRlZi00NWYwLTgwOWQtMGZjZTMwMGM3Y2Ey", + "@id": "{{contract-negotiation-id}}", "@type": "Set", "permission": [], "prohibition": [], diff --git a/transfer/transfer-02-consumer-pull/README.md b/transfer/transfer-02-consumer-pull/README.md index a9122afc..c63847a0 100644 --- a/transfer/transfer-02-consumer-pull/README.md +++ b/transfer/transfer-02-consumer-pull/README.md @@ -101,7 +101,7 @@ to get the data from the provider: "id": "591bb609-1edb-4a6b-babe-50f1eca3e1e9", "endpoint": "http://localhost:29291/public/", "authKey": "Authorization", - "authCode": "eyJhbGciOiJSUzI1NiJ9.eyJkYWQiOiJ7XCJwcm9wZXJ0aWVzXCI6e1wiYXV0aEtleVwiOlwiQXV0aG9yaXphdGlvblwiLFwiYmFzZVVybFwiOlwiaHR0cDpcL1wvbG9jYWxob3N0OjE5MjkxXC9wdWJsaWNcL1wiLFwiYXV0aENvZGVcIjpcImV5SmhiR2NpT2lKU1V6STFOaUo5LmV5SmtZV1FpT2lKN1hDSndjbTl3WlhKMGFXVnpYQ0k2ZTF3aVltRnpaVlZ5YkZ3aU9sd2lhSFIwY0hNNlhDOWNMMnB6YjI1d2JHRmpaV2h2YkdSbGNpNTBlWEJwWTI5a1pTNWpiMjFjTDNWelpYSnpYQ0lzWENKdVlXMWxYQ0k2WENKVVpYTjBJR0Z6YzJWMFhDSXNYQ0owZVhCbFhDSTZYQ0pJZEhSd1JHRjBZVndpZlgwaUxDSmxlSEFpT2pFMk56UTFPRGcwTWprc0ltTnBaQ0k2SWpFNk1XVTBOemc1TldZdE9UQXlOUzAwT1dVeExUazNNV1F0WldJNE5qVmpNemhrTlRRd0luMC5ITFJ6SFBkT2IxTVdWeWdYZi15a0NEMHZkU3NwUXlMclFOelFZckw5eU1tQjBzQThwMHFGYWV0ZjBYZHNHMG1HOFFNNUl5NlFtNVU3QnJFOUwxSE5UMktoaHFJZ1U2d3JuMVhGVUhtOERyb2dSemxuUkRlTU9ZMXowcDB6T2MwNGNDeFJWOEZoemo4UnVRVXVFODYwUzhqbU4wZk5sZHZWNlFpUVFYdy00QmRTQjNGYWJ1TmFUcFh6bDU1QV9SR2hNUGphS2w3RGsycXpJZ0ozMkhIdGIyQzhhZGJCY1pmRk12aEM2anZ2U1FieTRlZXU0OU1hclEydElJVmFRS1B4ajhYVnI3ZFFkYV95MUE4anNpekNjeWxyU3ljRklYRUV3eHh6Rm5XWmczV2htSUxPUFJmTzhna2RtemlnaXRlRjVEcmhnNjZJZzJPR0Eza2dBTUxtc3dcIixcInByb3h5TWV0aG9kXCI6XCJ0cnVlXCIsXCJwcm94eVF1ZXJ5UGFyYW1zXCI6XCJ0cnVlXCIsXCJwcm94eUJvZHlcIjpcInRydWVcIixcInR5cGVcIjpcIkh0dHBEYXRhXCIsXCJwcm94eVBhdGhcIjpcInRydWVcIn19IiwiZXhwIjoxNjc0NTg4NDI5LCJjaWQiOiIxOjFlNDc4OTVmLTkwMjUtNDllMS05NzFkLWViODY1YzM4ZDU0MCJ9.WhbTzERmM75mNMUG2Sh-8ZW6uDQCus_5uJPvGjAX16Ucc-2rDcOhAxrHjR_AAV4zWjKBHxQhYk2o9jD-9OiYb8Urv8vN4WtYFhxJ09A0V2c6lB1ouuPyCA_qKqJEWryTbturht4vf7W72P37ERo_HwlObOuJMq9CS4swA0GBqWupZHAnF-uPIQckaS9vLybJ-gqEhGxSnY4QAZ9-iwSUhkrH8zY2GCDkzAWIPmvtvRhAs9NqVkoUswG-ez1SUw5bKF0hn2OXv_KhfR8VsKKYUbKDQf5Wagk7rumlYbXMPNAEEagI4R0xiwKWVTfwwZPy_pYnHE7b4GQECz3NjhgdIw", + "authCode": "{{auth-code}}", "properties": { "cid": "1:1e47895f-9025-49e1-971d-eb865c38d540" } diff --git a/transfer/transfer-02-consumer-pull/resources/start-transfer.json b/transfer/transfer-02-consumer-pull/resources/start-transfer.json index 52d28509..48a2cc31 100644 --- a/transfer/transfer-02-consumer-pull/resources/start-transfer.json +++ b/transfer/transfer-02-consumer-pull/resources/start-transfer.json @@ -5,7 +5,7 @@ "@type": "TransferRequestDto", "connectorId": "provider", "counterPartyAddress": "http://localhost:19194/protocol", - "contractId": "", + "contractId": "{{contract-agreement-id}}", "assetId": "assetId", "protocol": "dataspace-protocol-http", "dataDestination": { diff --git a/transfer/transfer-03-provider-push/resources/start-transfer.json b/transfer/transfer-03-provider-push/resources/start-transfer.json index 97134f2b..65fe1541 100644 --- a/transfer/transfer-03-provider-push/resources/start-transfer.json +++ b/transfer/transfer-03-provider-push/resources/start-transfer.json @@ -5,7 +5,7 @@ "@type": "TransferRequestDto", "connectorId": "provider", "counterPartyAddress": "http://localhost:19194/protocol", - "contractId": "", + "contractId": "{{contract-agreement-id}}", "assetId": "assetId", "protocol": "dataspace-protocol-http", "dataDestination": { diff --git a/transfer/transfer-04-event-consumer/README.md b/transfer/transfer-04-event-consumer/README.md index 9780c9a4..3358da19 100644 --- a/transfer/transfer-04-event-consumer/README.md +++ b/transfer/transfer-04-event-consumer/README.md @@ -78,7 +78,7 @@ curl -d @transfer/transfer-01-negotiation/resources/negotiate-contract.json \ ### 3. Get the contract agreement id ```bash -curl -X GET "http://localhost:29193/management/v2/contractnegotiations/" \ +curl -X GET "http://localhost:29193/management/v2/contractnegotiations/{{contract-negotiation-id}}" \ --header 'Content-Type: application/json' \ -s | jq ``` diff --git a/util/http-request-logger/build.gradle.kts b/util/http-request-logger/build.gradle.kts index b1bf00ab..aa623cc6 100644 --- a/util/http-request-logger/build.gradle.kts +++ b/util/http-request-logger/build.gradle.kts @@ -17,6 +17,7 @@ plugins { } tasks.withType { + from(sourceSets["main"].output) manifest { attributes["Main-Class"] = "org.eclipse.edc.samples.util.HttpRequestLoggerServer" }