-
Notifications
You must be signed in to change notification settings - Fork 61
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add streaming kafka-to-http sample
Co-authored-by: ndr_brt <[email protected]>
- Loading branch information
1 parent
8ac69c0
commit 7961e78
Showing
16 changed files
with
467 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
148 changes: 148 additions & 0 deletions
148
.../src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/* | ||
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License, Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* Contributors: | ||
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample | ||
* | ||
*/ | ||
|
||
package org.eclipse.edc.samples.transfer.streaming; | ||
|
||
import jakarta.json.Json; | ||
import okhttp3.mockwebserver.MockWebServer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.eclipse.edc.junit.annotations.EndToEndTest; | ||
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; | ||
import org.eclipse.edc.junit.testfixtures.TestUtils; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.RegisterExtension; | ||
import org.testcontainers.containers.KafkaContainer; | ||
import org.testcontainers.junit.jupiter.Container; | ||
import org.testcontainers.junit.jupiter.Testcontainers; | ||
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.std.StringSerializer; | ||
import org.testcontainers.utility.DockerImageName; | ||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.awaitility.Awaitility.await; | ||
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; | ||
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath; | ||
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath; | ||
|
||
@Testcontainers | ||
@EndToEndTest | ||
public class Streaming02KafkaToHttpTest { | ||
|
||
private static final String KAFKA_IMAGE_NAME = "bashj79/kafka-kraft:3.0.0"; | ||
private static final String TOPIC = "kafka-stream-topic"; | ||
private static final String MAX_DURATION = "PT30S"; | ||
private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http"; | ||
private static final Duration TIMEOUT = Duration.ofSeconds(30); | ||
@Container | ||
public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName | ||
.parse(KAFKA_IMAGE_NAME)).withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1")); | ||
|
||
private static final Participant PROVIDER = Participant.Builder.newInstance() | ||
.name("provider") | ||
.id("provider") | ||
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) | ||
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) | ||
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) | ||
.build(); | ||
|
||
private static final Participant CONSUMER = Participant.Builder.newInstance() | ||
.name("consumer") | ||
.id("consumer") | ||
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) | ||
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) | ||
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) | ||
.build(); | ||
|
||
@RegisterExtension | ||
static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension( | ||
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", | ||
"provider", | ||
Map.of( | ||
"edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/provider.properties").getAbsolutePath() | ||
) | ||
); | ||
|
||
@RegisterExtension | ||
static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension( | ||
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", | ||
"provider", | ||
Map.of( | ||
"edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties").getAbsolutePath() | ||
) | ||
); | ||
private final int httpReceiverPort = TestUtils.getFreePort(); | ||
private final MockWebServer consumerReceiverServer = new MockWebServer(); | ||
|
||
@BeforeEach | ||
void setUp() throws IOException { | ||
KAFKA_CONTAINER.start(); | ||
consumerReceiverServer.start(httpReceiverPort); | ||
} | ||
|
||
@Test | ||
void streamData() { | ||
|
||
PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData")); | ||
|
||
PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json") | ||
.replace("{{bootstrap.servers}}", KAFKA_CONTAINER.getBootstrapServers()) | ||
.replace("{{max.duration}}", MAX_DURATION) | ||
.replace("{{topic}}", TOPIC)); | ||
PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json")); | ||
PROVIDER.createContractDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-contract-definition.json")); | ||
|
||
var destination = Json.createObjectBuilder() | ||
.add("type", "HttpData") | ||
.add("baseUrl", "http://localhost:" + httpReceiverPort) | ||
.build(); | ||
|
||
var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", Json.createObjectBuilder().build(), destination); | ||
|
||
await().atMost(TIMEOUT).untilAsserted(() -> { | ||
String state = CONSUMER.getTransferProcessState(transferProcessId); | ||
assertThat(state).isEqualTo(STARTED.name()); | ||
}); | ||
|
||
var producer = createKafkaProducer(); | ||
var message = "fake message"; | ||
producer.send(new ProducerRecord<>(TOPIC, "key", message)); | ||
producer.send(new ProducerRecord<>(TOPIC, "key", message)); | ||
|
||
await().atMost(TIMEOUT).untilAsserted(() -> { | ||
var request = consumerReceiverServer.takeRequest(); | ||
assertThat(request).isNotNull(); | ||
assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes()); | ||
}); | ||
|
||
producer.close(); | ||
} | ||
|
||
private Producer<String, String> createKafkaProducer() { | ||
java.util.Properties props = new java.util.Properties(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
return new KafkaProducer<>(props); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
source/* |
9 changes: 9 additions & 0 deletions
9
transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{ | ||
"@context": { | ||
"edc": "https://w3id.org/edc/v0.0.1/ns/" | ||
}, | ||
"@id": "http-pull-provider-dataplane", | ||
"url": "http://localhost:19192/control/transfer", | ||
"allowedSourceTypes": [ "Kafka" ], | ||
"allowedDestTypes": [ "HttpData" ] | ||
} |
12 changes: 12 additions & 0 deletions
12
transfer/streaming/streaming-02-kafka-to-http/1-asset.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, | ||
"@id": "kafka-stream-asset", | ||
"properties": { | ||
}, | ||
"dataAddress": { | ||
"type": "Kafka", | ||
"kafka.bootstrap.servers": "{{bootstrap.servers}}", | ||
"maxDuration": "{{max.duration}}", | ||
"topic": "{{topic}}" | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{ | ||
"@context": { | ||
"@vocab": "https://w3id.org/edc/v0.0.1/ns/", | ||
"odrl": "http://www.w3.org/ns/odrl/2/" | ||
}, | ||
"@id": "no-constraint-policy", | ||
"policy": { | ||
"@type": "odrl:use" | ||
} | ||
} |
7 changes: 7 additions & 0 deletions
7
transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, | ||
"@id": "contract-definition", | ||
"accessPolicyId": "no-constraint-policy", | ||
"contractPolicyId": "no-constraint-policy", | ||
"assetsSelector": [] | ||
} |
7 changes: 7 additions & 0 deletions
7
transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, | ||
"@type": "DatasetRequest", | ||
"@id": "kafka-stream-asset", | ||
"counterPartyAddress": "http://localhost:18182/protocol", | ||
"protocol": "dataspace-protocol-http" | ||
} |
23 changes: 23 additions & 0 deletions
23
transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
{ | ||
"@context": { | ||
"edc": "https://w3id.org/edc/v0.0.1/ns/", | ||
"odrl": "http://www.w3.org/ns/odrl/2/" | ||
}, | ||
"@type": "NegotiationInitiateRequestDto", | ||
"connectorAddress": "http://localhost:18182/protocol", | ||
"counterPartyAddress": "http://localhost:18182/protocol", | ||
"providerId": "provider", | ||
"protocol": "dataspace-protocol-http", | ||
"offer": { | ||
"offerId": "{{offerId}}", | ||
"assetId": "kafka-stream-asset", | ||
"policy": { | ||
"@id": "{{offerId}}", | ||
"@type": "use", | ||
"odrl:permission": [], | ||
"odrl:prohibition": [], | ||
"odrl:obligation": [], | ||
"odrl:target": "kafka-stream-asset" | ||
} | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
transfer/streaming/streaming-02-kafka-to-http/6-transfer.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
{ | ||
"@context": { | ||
"edc": "https://w3id.org/edc/v0.0.1/ns/" | ||
}, | ||
"@type": "TransferRequest", | ||
"dataDestination": { | ||
"type": "HttpData", | ||
"baseUrl": "http://localhost:4000" | ||
}, | ||
"protocol": "dataspace-protocol-http", | ||
"assetId": "stream-asset", | ||
"contractId": "{{contract-agreement-id}}", | ||
"connectorId": "provider", | ||
"connectorAddress": "http://localhost:18182/protocol" | ||
} |
Oops, something went wrong.