From 7961e7882b9eeaacc826413e66d3491988127c19 Mon Sep 17 00:00:00 2001 From: yurisilvapjd Date: Wed, 6 Sep 2023 21:37:30 +0100 Subject: [PATCH] feat: add streaming kafka-to-http sample Co-authored-by: ndr_brt --- gradle/libs.versions.toml | 6 + settings.gradle.kts | 1 + system-tests/build.gradle.kts | 3 + .../streaming/Streaming02KafkaToHttpTest.java | 148 +++++++++++++++++ .../streaming-02-kafka-to-http/.gitignore | 1 + .../0-dataplane.json | 9 + .../streaming-02-kafka-to-http/1-asset.json | 12 ++ .../2-policy-definition.json | 10 ++ .../3-contract-definition.json | 7 + .../4-get-dataset.json | 7 + .../5-negotiate-contract.json | 23 +++ .../6-transfer.json | 15 ++ .../streaming-02-kafka-to-http/README.md | 154 ++++++++++++++++++ .../streaming-02-runtime/build.gradle.kts | 48 ++++++ .../streaming-02-runtime/consumer.properties | 11 ++ .../streaming-02-runtime/provider.properties | 12 ++ 16 files changed, 467 insertions(+) create mode 100644 system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java create mode 100644 transfer/streaming/streaming-02-kafka-to-http/.gitignore create mode 100644 transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/1-asset.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/6-transfer.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/README.md create mode 100644 transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts create mode 100644 transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties create mode 100644 transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b60edf39..f5fddb93 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,6 +12,8 @@ okhttp-mockwebserver = "5.0.0-alpha.11" openTelemetry = "1.18.0" restAssured = "5.3.1" rsApi = "3.1.0" +kafkaClients = "3.5.1" +testContainers = "1.18.3" [libraries] @@ -32,6 +34,7 @@ edc-data-plane-azure-storage = { module = "org.eclipse.edc:data-plane-azure-stor edc-data-plane-client = { module = "org.eclipse.edc:data-plane-client", version.ref = "edc" } edc-data-plane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" } edc-data-plane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" } +edc-data-plane-kafka = { module = "org.eclipse.edc:data-plane-kafka", version.ref = "edc" } edc-data-plane-selector-api = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" } edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" } edc-data-plane-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" } @@ -65,6 +68,9 @@ junit-pioneer = { module = "org.junit-pioneer:junit-pioneer", version.ref = "jun okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp-mockwebserver" } opentelemetry-annotations = { module = "io.opentelemetry:opentelemetry-extension-annotations", version.ref = "openTelemetry" } restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssured" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" } +testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testContainers" } +testcontainers-junit = { module = "org.testcontainers:junit-jupiter", version.ref = "testContainers" } [plugins] shadow = { id = "com.github.johnrengelman.shadow", version = "8.1.1" } diff --git a/settings.gradle.kts b/settings.gradle.kts index 5d3eb165..dbcb5126 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -57,6 +57,7 @@ include("transfer:transfer-06-consumer-pull-http:http-pull-connector") include("transfer:transfer-07-provider-push-http:http-push-connector") include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime") +include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime") include("util:http-request-logger") diff --git a/system-tests/build.gradle.kts b/system-tests/build.gradle.kts index b59b966e..bb2b87c6 100644 --- a/system-tests/build.gradle.kts +++ b/system-tests/build.gradle.kts @@ -24,6 +24,9 @@ dependencies { testImplementation(libs.awaitility) testImplementation(libs.okhttp.mockwebserver) testImplementation(libs.restAssured) + testImplementation(libs.testcontainers.junit) + testImplementation(libs.testcontainers.kafka) + testImplementation(libs.kafka.clients) // runtimes testCompileOnly(project(":basic:basic-01-basic-connector")) diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java new file mode 100644 index 00000000..af0220ed --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming; + +import jakarta.json.Json; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.std.StringSerializer; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath; +import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath; + +@Testcontainers +@EndToEndTest +public class Streaming02KafkaToHttpTest { + + private static final String KAFKA_IMAGE_NAME = "bashj79/kafka-kraft:3.0.0"; + private static final String TOPIC = "kafka-stream-topic"; + private static final String MAX_DURATION = "PT30S"; + private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http"; + private static final Duration TIMEOUT = Duration.ofSeconds(30); + @Container + public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName + .parse(KAFKA_IMAGE_NAME)).withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1")); + + private static final Participant PROVIDER = Participant.Builder.newInstance() + .name("provider") + .id("provider") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .build(); + + private static final Participant CONSUMER = Participant.Builder.newInstance() + .name("consumer") + .id("consumer") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .build(); + + @RegisterExtension + static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension( + ":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", + "provider", + Map.of( + "edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/provider.properties").getAbsolutePath() + ) + ); + + @RegisterExtension + static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension( + ":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", + "provider", + Map.of( + "edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties").getAbsolutePath() + ) + ); + private final int httpReceiverPort = TestUtils.getFreePort(); + private final MockWebServer consumerReceiverServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + KAFKA_CONTAINER.start(); + consumerReceiverServer.start(httpReceiverPort); + } + + @Test + void streamData() { + + PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData")); + + PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json") + .replace("{{bootstrap.servers}}", KAFKA_CONTAINER.getBootstrapServers()) + .replace("{{max.duration}}", MAX_DURATION) + .replace("{{topic}}", TOPIC)); + PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json")); + PROVIDER.createContractDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-contract-definition.json")); + + var destination = Json.createObjectBuilder() + .add("type", "HttpData") + .add("baseUrl", "http://localhost:" + httpReceiverPort) + .build(); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", Json.createObjectBuilder().build(), destination); + + await().atMost(TIMEOUT).untilAsserted(() -> { + String state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + var producer = createKafkaProducer(); + var message = "fake message"; + producer.send(new ProducerRecord<>(TOPIC, "key", message)); + producer.send(new ProducerRecord<>(TOPIC, "key", message)); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var request = consumerReceiverServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes()); + }); + + producer.close(); + } + + private Producer createKafkaProducer() { + java.util.Properties props = new java.util.Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/.gitignore b/transfer/streaming/streaming-02-kafka-to-http/.gitignore new file mode 100644 index 00000000..3db92cfa --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/.gitignore @@ -0,0 +1 @@ +source/* diff --git a/transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json b/transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json new file mode 100644 index 00000000..9f1285a1 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json @@ -0,0 +1,9 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@id": "http-pull-provider-dataplane", + "url": "http://localhost:19192/control/transfer", + "allowedSourceTypes": [ "Kafka" ], + "allowedDestTypes": [ "HttpData" ] +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/1-asset.json b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json new file mode 100644 index 00000000..675614c9 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json @@ -0,0 +1,12 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "kafka-stream-asset", + "properties": { + }, + "dataAddress": { + "type": "Kafka", + "kafka.bootstrap.servers": "{{bootstrap.servers}}", + "maxDuration": "{{max.duration}}", + "topic": "{{topic}}" + } +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json b/transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json new file mode 100644 index 00000000..4919c71a --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json @@ -0,0 +1,10 @@ +{ + "@context": { + "@vocab": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@id": "no-constraint-policy", + "policy": { + "@type": "odrl:use" + } +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json b/transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json new file mode 100644 index 00000000..d424ec90 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "contract-definition", + "accessPolicyId": "no-constraint-policy", + "contractPolicyId": "no-constraint-policy", + "assetsSelector": [] +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json b/transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json new file mode 100644 index 00000000..0ec57558 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@type": "DatasetRequest", + "@id": "kafka-stream-asset", + "counterPartyAddress": "http://localhost:18182/protocol", + "protocol": "dataspace-protocol-http" +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json b/transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json new file mode 100644 index 00000000..b525b894 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json @@ -0,0 +1,23 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@type": "NegotiationInitiateRequestDto", + "connectorAddress": "http://localhost:18182/protocol", + "counterPartyAddress": "http://localhost:18182/protocol", + "providerId": "provider", + "protocol": "dataspace-protocol-http", + "offer": { + "offerId": "{{offerId}}", + "assetId": "kafka-stream-asset", + "policy": { + "@id": "{{offerId}}", + "@type": "use", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "kafka-stream-asset" + } + } +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/6-transfer.json b/transfer/streaming/streaming-02-kafka-to-http/6-transfer.json new file mode 100644 index 00000000..fc7e3937 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/6-transfer.json @@ -0,0 +1,15 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@type": "TransferRequest", + "dataDestination": { + "type": "HttpData", + "baseUrl": "http://localhost:4000" + }, + "protocol": "dataspace-protocol-http", + "assetId": "stream-asset", + "contractId": "{{contract-agreement-id}}", + "connectorId": "provider", + "connectorAddress": "http://localhost:18182/protocol" +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/README.md b/transfer/streaming/streaming-02-kafka-to-http/README.md new file mode 100644 index 00000000..aeac12c7 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/README.md @@ -0,0 +1,154 @@ +# Streaming KAFKA to HTTP + +This sample demonstrates how to set up the EDC to stream messages from Kafka to HTTP. +This code is only for demonstration purposes and should not be used in production. + +## Concept + +We will use the data-plane kafka `DataSource` extension that will pull event records from a kafka topic and push it +to every consumer that has started a `TransferProcess` for a related asset. + +### Run + +Build the connector runtime, which will be used both for the provider and consumer: +```shell +./gradlew :transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime:build +``` + +Run the provider and the consumer, which must be started from different terminal shells: +```shell +# provider +export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties +java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar + +#consumer +export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties +java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar +``` + +### Register Data Plane on provider + +The provider connector needs to be aware of the kafka streaming capabilities of the embedded dataplane, which can be registered with +this call: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +``` + +If you look at the `0-dataplane.json` you'll notice that the supported source is `Kafka` and the supported sink is `HttpData`. + +### Register Asset, Policy Definition and Contract Definition on provider + +A "source" kafka topic must first be created where the data plane will get the event records to be sent to the consumers. +To do this, initiate a Kafka server with the source topic: +```shell +docker run -e "KAFKA_CREATE_TOPICS={{topic}}:1:1" -p 9092:9092 -d bashj79/kafka-kraft:3.0.0 +``` + +Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing their placeholders. +```json + "dataAddress": { + "type": "Kafka", + "kafka.bootstrap.servers": "{{bootstrap.servers}}", + "maxDuration": "{{max.duration}}" + "topic": "{{topic}}" + } +``` + +Then create the Asset, the Policy Definition and the Contract Definition with these three calls: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/1-asset.json -X POST "http://localhost:18181/management/v3/assets" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +``` + +### Negotiate the contract + +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. +However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset +directly with this call: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +``` + +The output will be something like: +```json +{ + "@id": "kafka-stream-asset", + "@type": "dcat:Dataset", + "odrl:hasPolicy": { + "@id": "Y29udHJhY3QtZGVmaW5pdGlvbg==:c3RyZWFtLWFzc2V0:NDlhYTUzZWEtMDUzMS00ZDkyLTg4Y2YtMGRjMTc4MmQ1NjY4", + "@type": "odrl:Set", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "kafka-stream-asset" + }, + "dcat:distribution": { + "@type": "dcat:Distribution", + "dct:format": { + "@id": "HttpData" + }, + "dcat:accessService": "b24dfdbc-d17f-4d6e-9b5c-8fa71dacecfc" + }, + "edc:id": "kafka-stream-asset", + "@context": { + "dct": "https://purl.org/dc/terms/", + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "dcat": "https://www.w3.org/ns/dcat/", + "odrl": "http://www.w3.org/ns/odrl/2/", + "dspace": "https://w3id.org/dspace/v0.8/" + } +} +``` + +With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](5-negotiate-contract.json) file +and request the contract negotiation: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +``` + +### Start the transfer + +First we need to set up the receiver server on the consumer side that will receive a call for every new event. For this +you'll need to open another terminal shell and run: +```shell +./gradlew util:http-request-logger:build +HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +``` +It will run on port 4000. + +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with +this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +```shell +curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq +``` + +If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) +file to `{{contract-agreement-id}}` and then calling the connector with this command: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +``` +> Note that the destination address is `localhost:4000`, this because is where our http server is listening. + +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned +by the start transfer call: +```shell +curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq +``` + +### Produce events + +With the Kafka server running in Docker, you can use the Kafka command-line producer `kafka-console-producer.sh` to produce a message. In a new terminal shell, you'll need to execute: +```shell +docker exec -it {{docker-container-id}} /opt/kafka/bin/kafka-console-producer.sh --topic kafka-stream-topic --bootstrap-server localhost:9092 +``` +This command will open an interactive prompt for you to input your message. Once you've typed your message and pressed Enter, it will be produced, consumed and pushed to the receiver server. You should observe the content being logged on its terminal shell: + +``` +Incoming request +Method: POST +Path: / +Body: + +... +``` \ No newline at end of file diff --git a/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts new file mode 100644 index 00000000..b55c467f --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2020, 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * Fraunhofer Institute for Software and Systems Engineering - added dependencies + * ZF Friedrichshafen AG - add dependency + * + */ + +plugins { + `java-library` + id("application") + alias(libs.plugins.shadow) +} + +dependencies { + implementation(libs.edc.control.plane.core) + implementation(libs.edc.data.plane.selector.core) + implementation(libs.edc.api.observability) + implementation(libs.edc.configuration.filesystem) + implementation(libs.edc.iam.mock) + implementation(libs.edc.management.api) + implementation(libs.edc.dsp) + implementation(libs.edc.data.plane.selector.api) + implementation(libs.edc.data.plane.selector.client) + implementation(libs.edc.transfer.data.plane) + implementation(libs.edc.data.plane.spi) + implementation(libs.edc.data.plane.core) + implementation(libs.edc.data.plane.http) + implementation(libs.edc.data.plane.kafka) + +} + +application { + mainClass.set("org.eclipse.edc.boot.system.runtime.BaseRuntime") +} + +tasks.withType { + mergeServiceFiles() + archiveFileName.set("connector.jar") +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties new file mode 100644 index 00000000..a778cd4d --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties @@ -0,0 +1,11 @@ +web.http.port=28180 +web.http.path=/api +web.http.management.port=28181 +web.http.management.path=/management +web.http.protocol.port=28182 +web.http.protocol.path=/protocol +web.http.control.port=28183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:28182/protocol +edc.participant.id=consumer +edc.ids.id=urn:connector:consumer diff --git a/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties new file mode 100644 index 00000000..a357378a --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties @@ -0,0 +1,12 @@ +web.http.port=18180 +web.http.path=/api +web.http.management.port=18181 +web.http.management.path=/management +web.http.protocol.port=18182 +web.http.protocol.path=/protocol +web.http.control.port=18183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:18182/protocol +edc.participant.id=provider +edc.ids.id=urn:connector:provider +edc.dataplane.http.sink.partition.size=1