From f84a3fd4a6a3e9f6d2e6e6eb0a885893ed60a542 Mon Sep 17 00:00:00 2001
From: Jonas Bulcke <127748878+jobulcke@users.noreply.github.com>
Date: Thu, 17 Oct 2024 13:09:29 +0200
Subject: [PATCH] feat: extract props instead of using config (#699)
---
docs/_ldio/ldio-inputs/ldio-ldes-client.md | 10 -
.../event-stream-properties-fetcher/pom.xml | 16 ++
.../EventStreamPropertiesFetcher.java | 40 ++++
.../StartingNodeSpecificationFactory.java | 22 +++
.../valueobjects/EventStreamProperties.java | 39 ++++
.../valueobjects/PropertiesRequest.java | 24 +++
.../StartingNodeSpecification.java | 5 +
.../valueobjects/TreeNodeSpecification.java | 36 ++++
.../valueobjects/ViewSpecification.java | 47 +++++
.../EventStreamPropertiesFetcherTest.java | 84 ++++++++
.../StartingNodeSpecificationFactoryTest.java | 20 ++
.../TreeNodeSpecificationTest.java | 21 ++
.../valueobjects/ViewSpecificationTest.java | 28 +++
.../src/test/resources/models/eventstream.ttl | 59 ++++++
.../src/test/resources/models/treenode.ttl | 13 ++
.../src/test/resources/models/view.ttl | 43 +++++
ldi-core/ldes-client/pom.xml | 1 +
.../services/MemberSupplierWrapper.java | 15 ++
.../services/MemberSupplierWrappers.java | 24 +++
.../membersuppliers/MemberSupplierImpl.java | 2 +-
.../services/MemberSupplierWrapperTest.java | 55 ++++++
.../services/MemberSupplierWrappersTest.java | 53 ++++++
.../ldes-client-processor/pom.xml | 4 +
.../domain/valueobjects/LdesProperties.java | 25 ---
.../ldi/processors/LdesClientProcessor.java | 77 ++------
.../config/LdesProcessorProperties.java | 28 ---
.../ExactlyOnceMemberSupplierWrapper.java | 40 ++++
.../LatestStateMemberSupplierWrapper.java | 43 +++++
.../MemberSupplierWrappersBuilder.java | 31 +++
...sionMaterialisedMemberSupplierWrapper.java | 36 ++++
.../ldi/services/LdesPropertiesExtractor.java | 72 -------
.../LdesPropertyNotFoundException.java | 8 -
.../processors/LdesClientProcessorTest.java | 11 +-
.../ExactlyOnceMemberSupplierWrapperTest.java | 45 +++++
.../LatestStateMemberSupplierWrapperTest.java | 47 +++++
...MaterialisedMemberSupplierWrapperTest.java | 35 ++++
.../testutils/TestProcessContext.java | 38 ++++
.../200-response-with-indirect-url.json | 9 +-
.../LdioLdesClientConnectorAutoConfig.java | 19 +-
.../ldio-connectors/ldio-ldes-client/pom.xml | 5 +-
.../ldes/ldio/LdioLdesClientProperties.java | 100 +++++++++-
.../ldes/ldio/LdioLdesClientPropertyKeys.java | 19 ++
.../ldio/config/LdioLdesClientAutoConfig.java | 43 -----
.../config/LdioLdesClientConfigurator.java | 52 +++++
.../ldio/config/MemberSupplierFactory.java | 180 ++++--------------
.../ExactlyOnceMemberSupplierWrapper.java | 33 ++++
.../LatestStateMemberSupplierWrapper.java | 36 ++++
.../MemberSupplierWrappersBuilder.java | 30 +++
...sionMaterialisedMemberSupplierWrapper.java | 35 ++++
.../status/ClientStatusConverter.java | 42 ++++
.../ldio/LdioLdesClientPropertiesTest.java | 24 +++
.../ExactlyOnceMemberSupplierWrapperTest.java | 47 +++++
.../LatestStateMemberSupplierWrapperTest.java | 64 +++++++
.../ldio/config/LdioLdesClientITSteps.java | 8 +-
.../config/MemberSupplierFactoryTest.java | 49 ++++-
...MaterialisedMemberSupplierWrapperTest.java | 49 +++++
.../src/test/resources/__files/items-ldes.ttl | 11 ++
.../resources/features/ldes-client-in.feature | 2 -
.../src/test/resources/mappings/items.json | 15 ++
pom.xml | 5 +
60 files changed, 1649 insertions(+), 425 deletions(-)
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/pom.xml
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcher.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactory.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/EventStreamProperties.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/PropertiesRequest.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/StartingNodeSpecification.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecification.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecification.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcherTest.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactoryTest.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecificationTest.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecificationTest.java
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/eventstream.ttl
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/treenode.ttl
create mode 100644 ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/view.ttl
create mode 100644 ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapper.java
create mode 100644 ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappers.java
create mode 100644 ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapperTest.java
create mode 100644 ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappersTest.java
delete mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/domain/valueobjects/LdesProperties.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/ExactlyOnceMemberSupplierWrapper.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/LatestStateMemberSupplierWrapper.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/MemberSupplierWrappersBuilder.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/VersionMaterialisedMemberSupplierWrapper.java
delete mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertiesExtractor.java
delete mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertyNotFoundException.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/ExactlyOnceMemberSupplierWrapperTest.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/LatestStateMemberSupplierWrapperTest.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/VersionMaterialisedMemberSupplierWrapperTest.java
create mode 100644 ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/testutils/TestProcessContext.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertyKeys.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConfigurator.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/ExactlyOnceMemberSupplierWrapper.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/LatestStateMemberSupplierWrapper.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/MemberSupplierWrappersBuilder.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/VersionMaterialisedMemberSupplierWrapper.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/management/status/ClientStatusConverter.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertiesTest.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/ExactlyOnceMemberSupplierWrapperTest.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LatestStateMemberSupplierWrapperTest.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/VersionMaterialisedMemberSupplierWrapperTest.java
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/__files/items-ldes.ttl
create mode 100644 ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/mappings/items.json
diff --git a/docs/_ldio/ldio-inputs/ldio-ldes-client.md b/docs/_ldio/ldio-inputs/ldio-ldes-client.md
index eec191826..1283d05e7 100644
--- a/docs/_ldio/ldio-inputs/ldio-ldes-client.md
+++ b/docs/_ldio/ldio-inputs/ldio-ldes-client.md
@@ -32,10 +32,6 @@ within a fragment.
When the fragment is marked as immutable, and no members can be added anymore, the LDES Client will stop keeping track
of members processed within that fragment.
-Members within a fragment can be processed in order of time based on a timestamp. The path to this timestamp needs to be
-configured.
-If the patch is missing, members will be processed in random order.
-
### Filtering
#### Exactly-once-filter
@@ -96,7 +92,6 @@ CPU ([source](https://www.sqlite.org/faq.html#q19)).
| _source-format_ | The 'Content-Type' that should be requested to the server | No | text/turtle | application/n-quads | Any type supported by [Apache Jena](https://jena.apache.org/documentation/io/rdf-input.html#determining-the-rdf-syntax) |
| _state_ | 'memory', 'sqlite' or 'postgres' to indicate how the state should be persisted | No | memory | sqlite | 'memory', 'sqlite' or 'postgres' |
| _keep-state_ | Indicates if the state should be persisted on shutdown (n/a for in memory states) | No | false | false | true or false |
-| _timestamp-path_ | The property-path used to determine the timestamp on which the members will be ordered, and used for the `latest-state-filter` when enabled | No | N/A | http://www.w3.org/ns/prov#generatedAtTime | A property path |
| _enable-exactly-once_ | Indicates whether a member must be sent exactly once or at least once | No | true | true | true or false |
{: .note }
@@ -113,13 +108,8 @@ api
| Property | Description | Required | Default | Example | Supported values |
|:--------------------------------------|:--------------------------------------------------------------------------------------|:---------|:-------------------------------------|:-------------------------------------|:-----------------|
| _materialisation.enabled_ | Indicates if the client should return state-objects (true) or version-objects (false) | No | false | true | true or false |
-| _materialisation.version-of-property_ | Property that points to the versionOfPath | No | http://purl.org/dc/terms/isVersionOf | http://purl.org/dc/terms/isVersionOf | true or false |
| _materialisation.enable-latest-state_ | Indicates whether all state or only the latest state must be sent | No | true | false | true or false |
-{: .note }
-Don't forgot to provide a timestamp-path in the general properties, as this property is not required, but necessary for
-this filter to work properly!
-
{% include ldio-core/http-requester.md %}
### SQLite properties
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/pom.xml b/ldi-core/ldes-client/event-stream-properties-fetcher/pom.xml
new file mode 100644
index 000000000..3969e2ed5
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/pom.xml
@@ -0,0 +1,16 @@
+
+
+ 4.0.0
+
+ be.vlaanderen.informatievlaanderen.ldes.client
+ ldes-client
+ 2.10.0-SNAPSHOT
+
+
+ event-stream-properties-fetcher
+
+
+
+
\ No newline at end of file
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcher.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcher.java
new file mode 100644
index 000000000..367707fda
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcher.java
@@ -0,0 +1,40 @@
+package ldes.client.eventstreamproperties;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
+import ldes.client.eventstreamproperties.services.StartingNodeSpecificationFactory;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.eventstreamproperties.valueobjects.PropertiesRequest;
+import ldes.client.eventstreamproperties.valueobjects.StartingNodeSpecification;
+import org.apache.jena.riot.RDFParser;
+
+import java.io.ByteArrayInputStream;
+
+public class EventStreamPropertiesFetcher {
+ private final RequestExecutor requestExecutor;
+
+ public EventStreamPropertiesFetcher(RequestExecutor requestExecutor) {
+ this.requestExecutor = requestExecutor;
+ }
+
+ public EventStreamProperties fetchEventStreamProperties(PropertiesRequest request) {
+ final EventStreamProperties eventStreamProperties = executePropertiesRequest(request);
+
+ if(eventStreamProperties.containsRequiredProperties()) {
+ return eventStreamProperties;
+ }
+
+ return executePropertiesRequest(request.withUrl(eventStreamProperties.getUri()));
+
+ }
+
+ private EventStreamProperties executePropertiesRequest(PropertiesRequest request) {
+ return requestExecutor.execute(request.createRequest()).getBody()
+ .map(ByteArrayInputStream::new)
+ .map(body -> RDFParser.source(body).lang(request.lang()).toModel())
+ .map(StartingNodeSpecificationFactory::fromModel)
+ .map(StartingNodeSpecification::extractEventStreamProperties)
+ .orElseThrow();
+ }
+
+
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactory.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactory.java
new file mode 100644
index 000000000..f7f430551
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactory.java
@@ -0,0 +1,22 @@
+package ldes.client.eventstreamproperties.services;
+
+import ldes.client.eventstreamproperties.valueobjects.StartingNodeSpecification;
+import ldes.client.eventstreamproperties.valueobjects.TreeNodeSpecification;
+import ldes.client.eventstreamproperties.valueobjects.ViewSpecification;
+import org.apache.jena.rdf.model.Model;
+
+public class StartingNodeSpecificationFactory {
+ private StartingNodeSpecificationFactory() {
+ }
+
+ public static StartingNodeSpecification fromModel(Model model) {
+ if (TreeNodeSpecification.isTreeNode(model)) {
+ return new TreeNodeSpecification(model);
+ }
+ if (ViewSpecification.isViewSpecification(model)) {
+ return new ViewSpecification(model);
+ }
+ throw new IllegalStateException("The provided starting node must contain either a dcterms:isPartOf property or the ldes:versionOfPath and ldes:timestampPath properties");
+ }
+
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/EventStreamProperties.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/EventStreamProperties.java
new file mode 100644
index 000000000..ca12dc767
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/EventStreamProperties.java
@@ -0,0 +1,39 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+public final class EventStreamProperties {
+ private final String uri;
+ private final String versionOfPath;
+ private final String timestampPath;
+ private final String shaclShapeUri;
+
+ public EventStreamProperties(String uri) {
+ this(uri, null, null, null);
+ }
+
+ public EventStreamProperties(String uri, String versionOfPath, String timestampPath, String shaclShapeUri) {
+ this.uri = uri;
+ this.versionOfPath = versionOfPath;
+ this.timestampPath = timestampPath;
+ this.shaclShapeUri = shaclShapeUri;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public String getVersionOfPath() {
+ return versionOfPath;
+ }
+
+ public String getTimestampPath() {
+ return timestampPath;
+ }
+
+ public String getShaclShapeUri() {
+ return shaclShapeUri;
+ }
+
+ public boolean containsRequiredProperties() {
+ return timestampPath != null && versionOfPath != null;
+ }
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/PropertiesRequest.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/PropertiesRequest.java
new file mode 100644
index 000000000..c04cfc406
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/PropertiesRequest.java
@@ -0,0 +1,24 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.GetRequest;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.Request;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.RequestHeader;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.RequestHeaders;
+import org.apache.http.HttpHeaders;
+import org.apache.jena.riot.Lang;
+
+import java.util.List;
+
+public record PropertiesRequest(String url, Lang lang) {
+
+ public Request createRequest() {
+ RequestHeaders requestHeaders = new RequestHeaders(List.of(
+ new RequestHeader(HttpHeaders.ACCEPT, lang.getHeaderString())
+ ));
+ return new GetRequest(url, requestHeaders);
+ }
+
+ public PropertiesRequest withUrl(String url) {
+ return new PropertiesRequest(url, lang);
+ }
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/StartingNodeSpecification.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/StartingNodeSpecification.java
new file mode 100644
index 000000000..a1d7f8537
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/StartingNodeSpecification.java
@@ -0,0 +1,5 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+public interface StartingNodeSpecification {
+ EventStreamProperties extractEventStreamProperties();
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecification.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecification.java
new file mode 100644
index 000000000..00d9c80e0
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecification.java
@@ -0,0 +1,36 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.rdf.model.Property;
+import org.apache.jena.rdf.model.RDFNode;
+
+import java.util.Optional;
+
+import static org.apache.jena.rdf.model.ResourceFactory.createProperty;
+
+public class TreeNodeSpecification implements StartingNodeSpecification {
+ public static final String DC_TERMS = "http://purl.org/dc/terms/";
+ public static final Property IS_PART_OF = createProperty(DC_TERMS, "isPartOf");
+ private final Model model;
+
+ public TreeNodeSpecification(Model model) {
+ this.model = model;
+ }
+
+ @Override
+ public EventStreamProperties extractEventStreamProperties() {
+ return extractTreeNode(model).filter(RDFNode::isURIResource)
+ .map(RDFNode::asResource)
+ .map(node -> new EventStreamProperties(node.getURI()))
+ .orElseThrow();
+ }
+
+ public static boolean isTreeNode(Model model) {
+ return extractTreeNode(model).isPresent();
+ }
+
+ private static Optional extractTreeNode(Model model) {
+ return model.listObjectsOfProperty(IS_PART_OF).nextOptional();
+ }
+
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecification.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecification.java
new file mode 100644
index 000000000..6f75ae1e8
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/main/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecification.java
@@ -0,0 +1,47 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.rdf.model.Property;
+import org.apache.jena.rdf.model.Resource;
+
+import java.util.Optional;
+
+import static org.apache.jena.rdf.model.ResourceFactory.createProperty;
+
+public class ViewSpecification implements StartingNodeSpecification {
+ public static final String LDES = "https://w3id.org/ldes#";
+ public static final Property LDES_EVENT_STREAM = createProperty(LDES, "EventStream");
+ public static final String RDF_SYNTAX = "http://www.w3.org/1999/02/22-rdf-syntax-ns#";
+ public static final Property RDF_SYNTAX_TYPE = createProperty(RDF_SYNTAX, "type");
+ public static final Property TREE_SHAPE = createProperty("https://w3id.org/tree#", "shape");
+ public static final Property LDES_VERSION_OF_PATH = createProperty(LDES, "versionOfPath");
+ public static final Property LDES_TIMESTAMP_PATH = createProperty(LDES, "timestampPath");
+
+ private final Model model;
+
+ public ViewSpecification(Model model) {
+ this.model = model;
+ }
+
+ @Override
+ public EventStreamProperties extractEventStreamProperties() {
+ final Resource subject = extractEventStream(model).orElseThrow();
+ final String shapeUri = Optional.ofNullable(subject.getPropertyResourceValue(TREE_SHAPE))
+ .map(Resource::getURI)
+ .orElse("");
+ return new EventStreamProperties(
+ subject.getURI(),
+ subject.getPropertyResourceValue(LDES_VERSION_OF_PATH).getURI(),
+ subject.getPropertyResourceValue(LDES_TIMESTAMP_PATH).getURI(),
+ shapeUri
+ );
+ }
+
+ public static boolean isViewSpecification(Model model) {
+ return extractEventStream(model).isPresent();
+ }
+
+ private static Optional extractEventStream(Model model) {
+ return model.listSubjectsWithProperty(RDF_SYNTAX_TYPE, LDES_EVENT_STREAM).nextOptional();
+ }
+}
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcherTest.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcherTest.java
new file mode 100644
index 000000000..55f2bd276
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/EventStreamPropertiesFetcherTest.java
@@ -0,0 +1,84 @@
+package ldes.client.eventstreamproperties;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorFactory;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.eventstreamproperties.valueobjects.PropertiesRequest;
+import org.apache.jena.riot.Lang;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Objects;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@WireMockTest(httpPort = EventStreamPropertiesFetcherTest.WIREMOCK_PORT)
+class EventStreamPropertiesFetcherTest {
+ public static final int WIREMOCK_PORT = 12121;
+ private static final EventStreamProperties eventStreamProperties = new EventStreamProperties(
+ "http://localhost:12121/observations",
+ "http://purl.org/dc/terms/isVersionOf",
+ "http://www.w3.org/ns/prov#generatedAtTime",
+ ""
+ );
+ private EventStreamPropertiesFetcher fetcher;
+
+ @BeforeEach
+ void setUp() {
+ RequestExecutor requestExecutor = new RequestExecutorFactory().createNoAuthExecutor();
+ fetcher = new EventStreamPropertiesFetcher(requestExecutor);
+ }
+
+ @Test
+ void given_EventStream_when_FetchProperties_then_ReturnValidProperties() throws IOException, URISyntaxException {
+ URL resource = getClass().getClassLoader().getResource("models/eventstream.ttl");
+ final byte[] responseBytes = Files.readAllBytes(Path.of(Objects.requireNonNull(resource).toURI()));
+ stubFor(get("/observations").willReturn(ok().withBody(responseBytes)));
+
+ final EventStreamProperties properties = fetcher.fetchEventStreamProperties(new PropertiesRequest("http://localhost:12121/observations", Lang.TTL));
+
+ verify(getRequestedFor(urlEqualTo("/observations")));
+ assertThat(properties)
+ .usingRecursiveComparison()
+ .isEqualTo(eventStreamProperties);
+ }
+
+ @Test
+ void given_View_when_FetchProperties_then_ReturnValidProperties() throws IOException, URISyntaxException {
+ URL resource = getClass().getClassLoader().getResource("models/view.ttl");
+ final byte[] responseBytes = Files.readAllBytes(Path.of(Objects.requireNonNull(resource).toURI()));
+ stubFor(get("/observations/by-page").willReturn(ok().withBody(responseBytes)));
+
+ final EventStreamProperties properties = fetcher.fetchEventStreamProperties(new PropertiesRequest("http://localhost:12121/observations/by-page", Lang.TTL));
+
+ verify(getRequestedFor(urlEqualTo("/observations/by-page")));
+ assertThat(properties)
+ .usingRecursiveComparison()
+ .isEqualTo(eventStreamProperties);
+ }
+
+ @Test
+ void given_Fragment_when_FetchProperties_then_ReturnValidProperties() throws IOException, URISyntaxException {
+ URL treeNodeResource = getClass().getClassLoader().getResource("models/treenode.ttl");
+ final byte[] treeNodeBody = Files.readAllBytes(Path.of(Objects.requireNonNull(treeNodeResource).toURI()));
+ stubFor(get("/observations/by-page?pageNumber=1").willReturn(ok().withBody(treeNodeBody)));
+ URL eventSourceResource = getClass().getClassLoader().getResource("models/eventstream.ttl");
+ final byte[] eventStreamBody = Files.readAllBytes(Path.of(Objects.requireNonNull(eventSourceResource).toURI()));
+ stubFor(get("/observations").willReturn(ok().withBody(eventStreamBody)));
+
+ final EventStreamProperties properties = fetcher.fetchEventStreamProperties(new PropertiesRequest("http://localhost:12121/observations/by-page?pageNumber=1", Lang.TTL));
+
+ verify(getRequestedFor(urlEqualTo("/observations/by-page?pageNumber=1")));
+ verify(getRequestedFor(urlEqualTo("/observations")));
+ assertThat(properties)
+ .usingRecursiveComparison()
+ .isEqualTo(eventStreamProperties);
+ }
+}
\ No newline at end of file
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactoryTest.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactoryTest.java
new file mode 100644
index 000000000..8fd0a692c
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/services/StartingNodeSpecificationFactoryTest.java
@@ -0,0 +1,20 @@
+package ldes.client.eventstreamproperties.services;
+
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFParser;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class StartingNodeSpecificationFactoryTest {
+ @Test
+ void given_invalidModel_when_FromModel_then_ThrowException() {
+ String invalid = " .";
+ Model model = RDFParser.fromString(invalid).lang(Lang.NQ).toModel();
+
+ assertThatThrownBy(() -> StartingNodeSpecificationFactory.fromModel(model))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("The provided starting node must contain either a dcterms:isPartOf property or the ldes:versionOfPath and ldes:timestampPath properties");
+ }
+}
\ No newline at end of file
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecificationTest.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecificationTest.java
new file mode 100644
index 000000000..aa2365a42
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/TreeNodeSpecificationTest.java
@@ -0,0 +1,21 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.riot.RDFParser;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TreeNodeSpecificationTest {
+ @Test
+ void test_ExtractEventStreamProperties() {
+ final EventStreamProperties expectedESProperties = new EventStreamProperties("http://localhost:12121/observations");
+ final Model model = RDFParser.source("models/treenode.ttl").toModel();
+
+ final EventStreamProperties eventStreamProperties = new TreeNodeSpecification(model).extractEventStreamProperties();
+
+ assertThat(eventStreamProperties)
+ .usingRecursiveComparison()
+ .isEqualTo(expectedESProperties);
+ }
+}
\ No newline at end of file
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecificationTest.java b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecificationTest.java
new file mode 100644
index 000000000..f680e28fb
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/java/ldes/client/eventstreamproperties/valueobjects/ViewSpecificationTest.java
@@ -0,0 +1,28 @@
+package ldes.client.eventstreamproperties.valueobjects;
+
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.riot.RDFParser;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ViewSpecificationTest {
+ @ParameterizedTest
+ @ValueSource(strings = {"models/view.ttl", "models/eventstream.ttl"})
+ void test_ExtractEventStreamProperties(String fileUri) {
+ final EventStreamProperties expectedESProperties = new EventStreamProperties(
+ "http://localhost:12121/observations",
+ "http://purl.org/dc/terms/isVersionOf",
+ "http://www.w3.org/ns/prov#generatedAtTime",
+ ""
+ );
+ final Model model = RDFParser.source(fileUri).toModel();
+
+ final EventStreamProperties eventStreamProperties = new ViewSpecification(model).extractEventStreamProperties();
+
+ assertThat(eventStreamProperties)
+ .usingRecursiveComparison()
+ .isEqualTo(expectedESProperties);
+ }
+}
\ No newline at end of file
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/eventstream.ttl b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/eventstream.ttl
new file mode 100644
index 000000000..1485bf810
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/eventstream.ttl
@@ -0,0 +1,59 @@
+@prefix by-location: .
+@prefix by-page: .
+@prefix dc: .
+@prefix dcat: .
+@prefix ldes: .
+@prefix observations: .
+@prefix prov: .
+@prefix rdf: .
+@prefix rdfs: .
+@prefix shacl: .
+@prefix skos: .
+@prefix terms: .
+@prefix tree: .
+@prefix vsds-verkeersmetingen: .
+
+observations:by-location
+ rdf:type tree:Node , rdfs:Resource ;
+ tree:viewDescription by-location:description .
+
+
+ rdf:type dc:Standard .
+
+by-location:description
+ rdf:type dcat:DataService , tree:ViewDescription ;
+ ldes:retentionPolicy [ rdf:type ldes:DurationAgoPolicy ;
+ tree:value "P2Y"^^
+ ] ;
+ tree:fragmentationStrategy ( [ rdf:type tree:GeospatialFragmentation ;
+ tree:fragmentationPath "http://www.opengis.net/ont/geosparql#asWKT" ;
+ tree:maxZoom "14"
+ ]
+ ) ;
+ tree:pageSize "250"^^ .
+
+
+
+by-page:description rdf:type dcat:DataService , tree:ViewDescription ;
+ ldes:retentionPolicy [ rdf:type ldes:DurationAgoPolicy ;
+ tree:value "P2Y"^^
+ ] ;
+ tree:fragmentationStrategy () ;
+ tree:pageSize "250"^^ .
+
+
+
+ rdf:type dc:Standard .
+
+
+ rdf:type ldes:EventStream ;
+ ldes:timestampPath prov:generatedAtTime ;
+ ldes:versionOfPath dc:isVersionOf ;
+ tree:shape [ rdf:type shacl:NodeShape ;
+ shacl:targetClass vsds-verkeersmetingen:Verkeerstelling
+ ] ;
+ tree:view observations:by-page , observations:by-location .
+
+
+observations:by-page rdf:type tree:Node , rdfs:Resource ;
+ tree:viewDescription by-page:description .
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/treenode.ttl b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/treenode.ttl
new file mode 100644
index 000000000..1c8983b76
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/treenode.ttl
@@ -0,0 +1,13 @@
+@prefix ldes: .
+@prefix prov: .
+@prefix rdf: .
+@prefix terms: .
+@prefix tree: .
+
+
+
+ rdf:type tree:Node ;
+ terms:isPartOf ;
+ tree:relation [ rdf:type tree:Relation ;
+ tree:node
+ ] .
diff --git a/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/view.ttl b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/view.ttl
new file mode 100644
index 000000000..2ec41535c
--- /dev/null
+++ b/ldi-core/ldes-client/event-stream-properties-fetcher/src/test/resources/models/view.ttl
@@ -0,0 +1,43 @@
+@prefix GDI-Vlaanderen-Trefwoorden: .
+@prefix access-right: .
+@prefix dcat: .
+@prefix ldes: .
+@prefix metadata-dcat: .
+@prefix observations: .
+@prefix prov: .
+@prefix rdf: .
+@prefix rdfs: .
+@prefix shacl: .
+@prefix terms: .
+@prefix tree: .
+@prefix vsds-verkeersmetingen: .
+
+
+ rdf:type dcat:DataService ;
+ metadata-dcat:statuut GDI-Vlaanderen-Trefwoorden:VLOPENDATASERVICE ;
+ terms:accessRights access-right:PUBLIC ;
+ terms:description "Default paginated view on the traffic counts in Flanders"@en , "Default gepagineerde view van de verkeerstellingen in Vlaanderen"@nl ;
+ terms:identifier "http://localhost:12121/observations/by-page"^^rdfs:Literal ;
+ terms:title "Traffic Count in Flanders by page"@en , "Verkeerstellingen in Vlaanderen per pagina"@nl ;
+ dcat:contactPoint ;
+ dcat:endpointDescription ;
+ dcat:endpointURL observations:by-page ;
+ dcat:servesDataset .
+
+
+ rdf:type ldes:EventStream ;
+ ldes:timestampPath prov:generatedAtTime ;
+ ldes:versionOfPath terms:isVersionOf ;
+ tree:view observations:by-page .
+
+observations:by-page rdf:type rdfs:Resource , tree:Node ;
+ tree:relation [ rdf:type tree:Relation ;
+ tree:node
+ ] .
+
+[ rdf:type shacl:NodeShape ;
+ shacl:targetClass vsds-verkeersmetingen:Verkeerstelling
+] .
+
+
+ rdf:type rdfs:Resource .
\ No newline at end of file
diff --git a/ldi-core/ldes-client/pom.xml b/ldi-core/ldes-client/pom.xml
index 590948eb5..538931924 100644
--- a/ldi-core/ldes-client/pom.xml
+++ b/ldi-core/ldes-client/pom.xml
@@ -18,6 +18,7 @@
tree-node-fetcher
tree-node-supplier
tree-node-relations-fetcher
+ event-stream-properties-fetcher
diff --git a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapper.java b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapper.java
new file mode 100644
index 000000000..5b61c2c20
--- /dev/null
+++ b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapper.java
@@ -0,0 +1,15 @@
+package ldes.client.treenodesupplier.domain.services;
+
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+
+public abstract class MemberSupplierWrapper {
+ public final MemberSupplier wrapMemberSupplier(MemberSupplier memberSupplier) {
+ return shouldBeWrapped()
+ ? createWrappedMemberSupplier(memberSupplier)
+ : memberSupplier;
+ }
+
+ protected abstract boolean shouldBeWrapped();
+
+ protected abstract MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier);
+}
diff --git a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappers.java b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappers.java
new file mode 100644
index 000000000..778bc56ff
--- /dev/null
+++ b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappers.java
@@ -0,0 +1,24 @@
+package ldes.client.treenodesupplier.domain.services;
+
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+
+import java.util.List;
+
+public class MemberSupplierWrappers {
+ private final List wrappers;
+
+ public MemberSupplierWrappers(List wrappers) {
+ this.wrappers = wrappers;
+ }
+
+ public MemberSupplier wrapMemberSupplier(MemberSupplier memberSupplier) {
+ for (MemberSupplierWrapper wrapper : wrappers) {
+ memberSupplier = wrapper.wrapMemberSupplier(memberSupplier);
+ }
+ return memberSupplier;
+ }
+
+ public interface Builder {
+ MemberSupplierWrappers build();
+ }
+}
diff --git a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/membersuppliers/MemberSupplierImpl.java b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/membersuppliers/MemberSupplierImpl.java
index 84f3f4575..a369f1acf 100644
--- a/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/membersuppliers/MemberSupplierImpl.java
+++ b/ldi-core/ldes-client/tree-node-supplier/src/main/java/ldes/client/treenodesupplier/membersuppliers/MemberSupplierImpl.java
@@ -29,7 +29,7 @@ public SuppliedMember get() {
@Override
public void destroyState() {
- if (!keepState) {
+ if (!keepState && treeNodeProcessor != null) {
treeNodeProcessor.destroyState();
}
}
diff --git a/ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapperTest.java b/ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapperTest.java
new file mode 100644
index 000000000..4ba404342
--- /dev/null
+++ b/ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrapperTest.java
@@ -0,0 +1,55 @@
+package ldes.client.treenodesupplier.domain.services;
+
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+@ExtendWith(MockitoExtension.class)
+class MemberSupplierWrapperTest {
+ private MemberSupplierWrapper wrapper;
+
+
+ @Test
+ void given_ShouldBeWrapped_when_Wrap_then_ReturnWrappedMemberSupplier() {
+ final MemberSupplier base = mock();
+ wrapper = new TestMemberSupplierWrapper(true, mock());
+
+ final var result = wrapper.wrapMemberSupplier(base);
+
+ assertThat(result).isNotSameAs(base);
+ }
+
+ @Test
+ void given_ShouldNotBeWrapped_when_Wrap_then_ReturnBaseMemberSupplier() {
+ final MemberSupplier base = mock();
+ wrapper = new TestMemberSupplierWrapper(false, mock());
+
+ final var result = wrapper.wrapMemberSupplier(base);
+
+ assertThat(result).isSameAs(base);
+ }
+
+ private static class TestMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final boolean shouldBeWrapped;
+ private final MemberSupplier nextMemberSupplier;
+
+ private TestMemberSupplierWrapper(boolean shouldBeWrapped, MemberSupplier nextMemberSupplier) {
+ this.shouldBeWrapped = shouldBeWrapped;
+ this.nextMemberSupplier = nextMemberSupplier;
+ }
+
+ @Override
+ protected boolean shouldBeWrapped() {
+ return shouldBeWrapped;
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return nextMemberSupplier;
+ }
+ }
+}
\ No newline at end of file
diff --git a/ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappersTest.java b/ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappersTest.java
new file mode 100644
index 000000000..34905c03a
--- /dev/null
+++ b/ldi-core/ldes-client/tree-node-supplier/src/test/java/ldes/client/treenodesupplier/domain/services/MemberSupplierWrappersTest.java
@@ -0,0 +1,53 @@
+package ldes.client.treenodesupplier.domain.services;
+
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplierImpl;
+import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class MemberSupplierWrappersTest {
+ private final MemberSupplier baseMemberSupplier = new MemberSupplierImpl(mock(), false);
+ private final MemberSupplier firstWrapped = new FilteredMemberSupplier(baseMemberSupplier, mock());
+ private final MemberSupplier secondWrapped = new VersionMaterialisedMemberSupplier(firstWrapped, mock());
+ @Mock
+ private MemberSupplierWrapper mockedMemberSupplierWrapper;
+
+ private MemberSupplierWrappers wrappers;
+
+ @BeforeEach
+ void setUp() {
+ wrappers = new MemberSupplierWrappers(List.of(
+ mockedMemberSupplierWrapper,
+ mockedMemberSupplierWrapper,
+ mockedMemberSupplierWrapper
+ ));
+ }
+
+ @Test
+ void name() {
+ when(mockedMemberSupplierWrapper.wrapMemberSupplier(any()))
+ .thenReturn(firstWrapped)
+ .thenReturn(firstWrapped)
+ .thenReturn(secondWrapped);
+
+ final MemberSupplier result = wrappers.wrapMemberSupplier(baseMemberSupplier);
+
+ assertThat(result).isSameAs(secondWrapped);
+ verify(mockedMemberSupplierWrapper).wrapMemberSupplier(baseMemberSupplier);
+ verify(mockedMemberSupplierWrapper, times(2)).wrapMemberSupplier(firstWrapped);
+ verify(mockedMemberSupplierWrapper, never()).wrapMemberSupplier(secondWrapped);
+ }
+
+}
\ No newline at end of file
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/pom.xml b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/pom.xml
index 0634f8e20..edf8abf1c 100644
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/pom.xml
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/pom.xml
@@ -22,6 +22,10 @@
ldi-common
${project.version}
+
+ be.vlaanderen.informatievlaanderen.ldes.client
+ event-stream-properties-fetcher
+
com.github.tomakehurst
wiremock-jre8
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/domain/valueobjects/LdesProperties.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/domain/valueobjects/LdesProperties.java
deleted file mode 100644
index d41206849..000000000
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/domain/valueobjects/LdesProperties.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package be.vlaanderen.informatievlaanderen.ldes.ldi.domain.valueobjects;
-
-public class LdesProperties {
- private final String timestampPath;
- private final String versionOfPath;
- private final String shape;
-
- public LdesProperties(String timestampPath, String versionOfPath, String shape) {
- this.timestampPath = timestampPath;
- this.versionOfPath = versionOfPath;
- this.shape = shape;
- }
-
- public String getTimestampPath() {
- return timestampPath;
- }
-
- public String getVersionOfPath() {
- return versionOfPath;
- }
-
- public String getShape() {
- return shape;
- }
-}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessor.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessor.java
index ffe7745e2..d2ab73f7f 100644
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessor.java
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessor.java
@@ -1,26 +1,22 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.processors;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.VersionMaterialiser;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.domain.valueobjects.LdesProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.services.FlowManager;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.wrappers.MemberSupplierWrappersBuilder;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.retry.RetryConfig;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorDecorator;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorFactory;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.services.LdesPropertiesExtractor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.timestampextractor.TimestampExtractor;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.timestampextractor.TimestampFromCurrentTimeExtractor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.timestampextractor.TimestampFromPathExtractor;
import io.github.resilience4j.retry.Retry;
+import ldes.client.eventstreamproperties.EventStreamPropertiesFetcher;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.eventstreamproperties.valueobjects.PropertiesRequest;
import ldes.client.treenodesupplier.TreeNodeProcessor;
import ldes.client.treenodesupplier.domain.valueobject.*;
-import ldes.client.treenodesupplier.filters.ExactlyOnceFilter;
-import ldes.client.treenodesupplier.filters.LatestStateFilter;
-import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
import ldes.client.treenodesupplier.membersuppliers.MemberSupplierImpl;
-import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.apache.jena.rdf.model.Model;
@@ -60,11 +56,9 @@ public class LdesClientProcessor extends AbstractProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LdesClientProcessor.class);
private MemberSupplier memberSupplier;
- private LdesProperties ldesProperties;
+ private EventStreamProperties eventStreamProperties;
private final RequestExecutorFactory requestExecutorFactory = new RequestExecutorFactory(false);
- private final StatePersistenceFactory statePersistenceFactory = new StatePersistenceFactory();
private boolean hasLdesEnded;
- private boolean keepState;
@Override
public Set getRelationships() {
@@ -78,10 +72,8 @@ public final List getSupportedPropertyDescriptors() {
DATA_DESTINATION_FORMAT,
STATE_PERSISTENCE_STRATEGY,
KEEP_STATE,
- TIMESTAMP_PATH,
USE_EXACTLY_ONCE_FILTER,
USE_VERSION_MATERIALISATION,
- VERSION_OF_PROPERTY,
USE_LATEST_STATE_FILTER,
AUTHORIZATION_STRATEGY,
API_KEY_PROPERTY,
@@ -109,45 +101,22 @@ public void onScheduled(final ProcessContext context) {
Lang dataSourceFormat = LdesProcessorProperties.getDataSourceFormat(context);
final RequestExecutor requestExecutor = getRequestExecutorWithPossibleRetry(context);
LdesMetaData ldesMetaData = new LdesMetaData(dataSourceUrls, dataSourceFormat);
- StatePersistence statePersistence = statePersistenceFactory.getStatePersistence(context);
- String timestampPath = LdesProcessorProperties.getTimestampPath(context);
- TimestampExtractor timestampExtractor = timestampPath.isBlank() ? new TimestampFromCurrentTimeExtractor() :
- new TimestampFromPathExtractor(createProperty(timestampPath));
+ StatePersistence statePersistence = new StatePersistenceFactory().getStatePersistence(context);
+ eventStreamProperties = fetchEventStreamProperties(ldesMetaData, requestExecutor);
+ TimestampExtractor timestampExtractor = new TimestampFromPathExtractor(createProperty(eventStreamProperties.getTimestampPath()));
TreeNodeProcessor treeNodeProcessor = new TreeNodeProcessor(ldesMetaData, statePersistence, requestExecutor,
timestampExtractor, clientStatusConsumer());
- keepState = stateKept(context);
- final MemberSupplierImpl baseMemberSupplier = new MemberSupplierImpl(treeNodeProcessor, keepState);
-
- if (useVersionMaterialisation(context)) {
- final var versionOfProperty = createProperty(getVersionOfProperty(context));
- final var versionMaterialiser = new VersionMaterialiser(versionOfProperty, restrictToMembers(context));
- MemberSupplier decoratedMemberSupplier = useLatestStateFilter(context)
- ? new FilteredMemberSupplier(baseMemberSupplier, getLatestStateFilter(context, statePersistence))
- : baseMemberSupplier;
- memberSupplier = new VersionMaterialisedMemberSupplier(
- decoratedMemberSupplier,
- versionMaterialiser
- );
- } else if (useExactlyOnceFilter(context)) {
- memberSupplier = new FilteredMemberSupplier(
- baseMemberSupplier,
- new ExactlyOnceFilter(statePersistence.getMemberIdRepository(), keepState)
- );
- } else {
- memberSupplier = baseMemberSupplier;
- }
-
+ final MemberSupplier baseMemberSupplier = new MemberSupplierImpl(treeNodeProcessor, stateKept(context));
+ memberSupplier = new MemberSupplierWrappersBuilder()
+ .withContext(context)
+ .withEventStreamProperties(eventStreamProperties)
+ .build()
+ .wrapMemberSupplier(baseMemberSupplier);
memberSupplier.init();
- determineLdesProperties(ldesMetaData, requestExecutor, context);
-
LOGGER.info("LDES Client processor {} configured to follow (sub)streams {} (expected LDES source format: {})",
context.getName(), dataSourceUrls, dataSourceFormat);
}
- private LatestStateFilter getLatestStateFilter(ProcessContext context, StatePersistence statePersistence) {
- return new LatestStateFilter(statePersistence.getMemberVersionRepository(), keepState, getTimestampPath(context), getVersionOfProperty(context));
- }
-
private RequestExecutor getRequestExecutorWithPossibleRetry(final ProcessContext context) {
return RequestExecutorDecorator.decorate(getRequestExecutor(context)).with(getRetry(context)).get();
}
@@ -175,13 +144,9 @@ private RequestExecutor getRequestExecutor(final ProcessContext context) {
};
}
- private void determineLdesProperties(LdesMetaData ldesMetaData, RequestExecutor requestExecutor,
- ProcessContext context) {
- boolean timestampPath = streamTimestampPathProperty(context);
- boolean versionOfPath = streamVersionOfProperty(context);
- boolean shape = streamShapeProperty(context);
- ldesProperties = new LdesPropertiesExtractor(requestExecutor).getLdesProperties(ldesMetaData, timestampPath,
- versionOfPath, shape);
+ private EventStreamProperties fetchEventStreamProperties(LdesMetaData ldesMetaData, RequestExecutor requestExecutor) {
+ PropertiesRequest request = new PropertiesRequest(ldesMetaData.getStartingNodeUrl(), ldesMetaData.getLang());
+ return new EventStreamPropertiesFetcher(requestExecutor).fetchEventStreamProperties(request);
}
@Override
@@ -205,13 +170,13 @@ private void processNextMember(ProcessContext context, ProcessSession session) {
FlowFile flowFile = session.create();
if (streamTimestampPathProperty(context)) {
- session.putAttribute(flowFile, "ldes.timestamppath", ldesProperties.getTimestampPath());
+ session.putAttribute(flowFile, "ldes.timestamppath", eventStreamProperties.getTimestampPath());
}
if (streamVersionOfProperty(context)) {
- session.putAttribute(flowFile, "ldes.isversionofpath", ldesProperties.getVersionOfPath());
+ session.putAttribute(flowFile, "ldes.isversionofpath", eventStreamProperties.getVersionOfPath());
}
if (streamShapeProperty(context)) {
- session.putAttribute(flowFile, "ldes.shacleshapes", ldesProperties.getShape());
+ session.putAttribute(flowFile, "ldes.shacleshapes", eventStreamProperties.getShaclShapeUri());
}
Lang dataDestinationFormat = LdesProcessorProperties.getDataDestinationFormat(context);
FlowManager.sendRDFToRelation(session, flowFile,
@@ -221,7 +186,7 @@ private void processNextMember(ProcessContext context, ProcessSession session) {
@OnRemoved
public void onRemoved() {
- if (!keepState && memberSupplier != null) {
+ if (memberSupplier != null) {
memberSupplier.destroyState();
}
}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/config/LdesProcessorProperties.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/config/LdesProcessorProperties.java
index ee6a2bd8f..32a86e299 100644
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/config/LdesProcessorProperties.java
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/config/LdesProcessorProperties.java
@@ -5,7 +5,6 @@
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFLanguages;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
@@ -47,15 +46,6 @@ private LdesProcessorProperties() {
.defaultValue(DEFAULT_DATA_SOURCE_FORMAT.getHeaderString())
.build();
- public static final PropertyDescriptor TIMESTAMP_PATH = new PropertyDescriptor.Builder()
- .name("TIMESTAMP_PATH")
- .displayName("Timestamp path")
- .description("Property path determining the timestamp used to order the members within a fragment")
- .required(false)
- .addValidator(Validator.VALID)
- .defaultValue("http://www.w3.org/ns/prov#generatedAtTime")
- .build();
-
public static final PropertyDescriptor STREAM_TIMESTAMP_PATH_PROPERTY = new PropertyDescriptor.Builder()
.name("STREAM_TIMESTAMP_PATH_PROPERTY")
.displayName("Stream timestamp path property")
@@ -190,16 +180,6 @@ private LdesProcessorProperties() {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
- public static final PropertyDescriptor VERSION_OF_PROPERTY = new PropertyDescriptor.Builder()
- .name("VERSION_OF_PROPERTY")
- .displayName("Version of property")
- .description("Property that points to the versionOfPath")
- .required(true)
- .defaultValue("http://purl.org/dc/terms/isVersionOf")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .addValidator(StandardValidators.URI_VALIDATOR)
- .build();
-
public static final PropertyDescriptor USE_EXACTLY_ONCE_FILTER = new PropertyDescriptor.Builder()
.name("USE_EXACTLY_ONCE_FILTER")
.displayName("Use exactly once filter")
@@ -238,10 +218,6 @@ public static Lang getDataDestinationFormat(final ProcessContext context) {
return RDFLanguages.nameToLang(context.getProperty(DATA_DESTINATION_FORMAT).getValue());
}
- public static String getTimestampPath(final ProcessContext context) {
- return context.getProperty(TIMESTAMP_PATH).getValue();
- }
-
public static boolean streamTimestampPathProperty(final ProcessContext context) {
return TRUE.equals(context.getProperty(STREAM_TIMESTAMP_PATH_PROPERTY).asBoolean());
}
@@ -318,10 +294,6 @@ public static boolean restrictToMembers(final ProcessContext context) {
return TRUE.equals(context.getProperty(RESTRICT_TO_MEMBERS).asBoolean());
}
- public static String getVersionOfProperty(final ProcessContext context) {
- return context.getProperty(VERSION_OF_PROPERTY).getValue();
- }
-
private static boolean isValidUrl(String url) {
try {
new URI(url);
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/ExactlyOnceMemberSupplierWrapper.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/ExactlyOnceMemberSupplierWrapper.java
new file mode 100644
index 000000000..7a6e8599f
--- /dev/null
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/ExactlyOnceMemberSupplierWrapper.java
@@ -0,0 +1,40 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldi.processors.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.StatePersistenceFactory;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.PersistenceProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrapper;
+import ldes.client.treenodesupplier.filters.ExactlyOnceFilter;
+import ldes.client.treenodesupplier.filters.MemberFilter;
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import ldes.client.treenodesupplier.repository.MemberIdRepository;
+import org.apache.nifi.processor.ProcessContext;
+
+import static be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorProperties.useExactlyOnceFilter;
+import static be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorProperties.useVersionMaterialisation;
+
+public class ExactlyOnceMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final ProcessContext context;
+
+ public ExactlyOnceMemberSupplierWrapper(ProcessContext context) {
+ this.context = context;
+ }
+
+ @Override
+ protected boolean shouldBeWrapped() {
+ return !useVersionMaterialisation(context) && useExactlyOnceFilter(context);
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return new FilteredMemberSupplier(memberSupplier, createMemberFilter());
+ }
+
+ private MemberFilter createMemberFilter() {
+ final MemberIdRepository memberIdRepository = new StatePersistenceFactory()
+ .getStatePersistence(context)
+ .getMemberIdRepository();
+ final boolean keepState = PersistenceProperties.stateKept(context);
+ return new ExactlyOnceFilter(memberIdRepository, keepState);
+ }
+}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/LatestStateMemberSupplierWrapper.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/LatestStateMemberSupplierWrapper.java
new file mode 100644
index 000000000..dbbeeb675
--- /dev/null
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/LatestStateMemberSupplierWrapper.java
@@ -0,0 +1,43 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldi.processors.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.StatePersistenceFactory;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.PersistenceProperties;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrapper;
+import ldes.client.treenodesupplier.filters.LatestStateFilter;
+import ldes.client.treenodesupplier.filters.MemberFilter;
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import ldes.client.treenodesupplier.repository.MemberVersionRepository;
+import org.apache.nifi.processor.ProcessContext;
+
+import static be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorProperties.useLatestStateFilter;
+import static be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorProperties.useVersionMaterialisation;
+
+public class LatestStateMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final ProcessContext context;
+ private final EventStreamProperties eventStreamProperties;
+
+ public LatestStateMemberSupplierWrapper(ProcessContext context, EventStreamProperties eventStreamProperties) {
+ this.context = context;
+ this.eventStreamProperties = eventStreamProperties;
+ }
+
+ @Override
+ protected boolean shouldBeWrapped() {
+ return useVersionMaterialisation(context) && useLatestStateFilter(context);
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return new FilteredMemberSupplier(memberSupplier, createMemberFilter());
+ }
+
+ private MemberFilter createMemberFilter() {
+ final MemberVersionRepository memberVersionRepository = new StatePersistenceFactory()
+ .getStatePersistence(context)
+ .getMemberVersionRepository();
+ final boolean keepState = PersistenceProperties.stateKept(context);
+ return new LatestStateFilter(memberVersionRepository, keepState, eventStreamProperties.getTimestampPath(), eventStreamProperties.getVersionOfPath());
+ }
+}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/MemberSupplierWrappersBuilder.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/MemberSupplierWrappersBuilder.java
new file mode 100644
index 000000000..3b440717a
--- /dev/null
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/MemberSupplierWrappersBuilder.java
@@ -0,0 +1,31 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldi.processors.wrappers;
+
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrappers;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.util.List;
+
+public class MemberSupplierWrappersBuilder implements MemberSupplierWrappers.Builder {
+ private ProcessContext context;
+ private EventStreamProperties eventStreamProperties;
+
+ public MemberSupplierWrappersBuilder withContext(ProcessContext context) {
+ this.context = context;
+ return this;
+ }
+
+ public MemberSupplierWrappersBuilder withEventStreamProperties(EventStreamProperties eventStreamProperties) {
+ this.eventStreamProperties = eventStreamProperties;
+ return this;
+ }
+
+ @Override
+ public MemberSupplierWrappers build() {
+ return new MemberSupplierWrappers(List.of(
+ new ExactlyOnceMemberSupplierWrapper(context),
+ new LatestStateMemberSupplierWrapper(context, eventStreamProperties),
+ new VersionMaterialisedMemberSupplierWrapper(context, eventStreamProperties.getVersionOfPath())
+ ));
+ }
+}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/VersionMaterialisedMemberSupplierWrapper.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/VersionMaterialisedMemberSupplierWrapper.java
new file mode 100644
index 000000000..98e3fcb74
--- /dev/null
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/wrappers/VersionMaterialisedMemberSupplierWrapper.java
@@ -0,0 +1,36 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldi.processors.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.VersionMaterialiser;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrapper;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
+import org.apache.jena.rdf.model.Property;
+import org.apache.jena.rdf.model.ResourceFactory;
+import org.apache.nifi.processor.ProcessContext;
+
+public class VersionMaterialisedMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final ProcessContext context;
+ private final String versionOfPath;
+
+ public VersionMaterialisedMemberSupplierWrapper(ProcessContext context, String versionOfPath) {
+ this.context = context;
+ this.versionOfPath = versionOfPath;
+ }
+
+ @Override
+ protected boolean shouldBeWrapped() {
+ return LdesProcessorProperties.useVersionMaterialisation(context);
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return new VersionMaterialisedMemberSupplier(memberSupplier, createVersionMaterialiser());
+ }
+
+ private VersionMaterialiser createVersionMaterialiser() {
+ final Property versionOfPredicate = ResourceFactory.createProperty(versionOfPath);
+ final boolean restrictToMembers = LdesProcessorProperties.restrictToMembers(context);
+ return new VersionMaterialiser(versionOfPredicate, restrictToMembers);
+ }
+}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertiesExtractor.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertiesExtractor.java
deleted file mode 100644
index 1af8e216d..000000000
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertiesExtractor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package be.vlaanderen.informatievlaanderen.ldes.ldi.services;
-
-import be.vlaanderen.informatievlaanderen.ldes.ldi.domain.valueobjects.LdesProperties;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.Response;
-import ldes.client.startingtreenode.RedirectRequestExecutor;
-import ldes.client.startingtreenode.domain.valueobjects.RedirectHistory;
-import ldes.client.startingtreenode.domain.valueobjects.StartingNodeRequest;
-import ldes.client.treenodesupplier.domain.valueobject.LdesMetaData;
-import org.apache.jena.rdf.model.*;
-import org.apache.jena.riot.Lang;
-import org.apache.jena.riot.RDFParser;
-
-import java.io.ByteArrayInputStream;
-import java.util.Optional;
-
-import static org.apache.jena.rdf.model.ResourceFactory.createProperty;
-
-public class LdesPropertiesExtractor {
-
- private static final String LDES = "https://w3id.org/ldes#";
- private static final Property LDES_VERSION_OF = createProperty(LDES, "versionOfPath");
- private static final Property LDES_TIMESTAMP_PATH = createProperty(LDES, "timestampPath");
- private static final Property TREE_SHAPE = createProperty("https://w3id.org/tree#", "shape");
- private final RequestExecutor requestExecutor;
-
- public LdesPropertiesExtractor(RequestExecutor requestExecutor) {
- this.requestExecutor = requestExecutor;
- }
-
- private Optional getPropertyValue(Model model, Property property) {
- return model.listStatements(null, property, (Resource) null)
- .nextOptional()
- .map(Statement::getObject)
- .map(RDFNode::asResource)
- .map(Resource::toString);
- }
-
- public LdesProperties getLdesProperties(LdesMetaData ldesMetaData, boolean needTimestampPath,
- boolean needVersionOfPath,
- boolean needShape) {
-
- Model model = getModelFromStartingTreeNode(ldesMetaData.getStartingNodeUrl(), ldesMetaData.getLang());
-
- String timestampPath = getResource(needTimestampPath, model, LDES_TIMESTAMP_PATH);
- String versionOfPath = getResource(needVersionOfPath, model, LDES_VERSION_OF);
- String shape = getResource(needShape, model, TREE_SHAPE);
- return new LdesProperties(timestampPath, versionOfPath, shape);
- }
-
- private Model getModelFromStartingTreeNode(String url, Lang lang) {
- RedirectRequestExecutor redirectRequestExecutor = new RedirectRequestExecutor(requestExecutor);
- Response response = redirectRequestExecutor
- .execute(new StartingNodeRequest(url, lang, new RedirectHistory()));
-
- return RDFParser
- .source(response.getBody().map(ByteArrayInputStream::new).orElseThrow())
- .lang(lang)
- .build()
- .toModel();
- }
-
- private String getResource(boolean resourceNeeded, Model model, Property property) {
- return resourceNeeded
- ? getResource(model, property).orElseThrow(() -> new LdesPropertyNotFoundException(property.toString()))
- : null;
- }
-
- public Optional getResource(Model model, Property property) {
- return getPropertyValue(model, property);
- }
-}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertyNotFoundException.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertyNotFoundException.java
deleted file mode 100644
index 746ad57a5..000000000
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldi/services/LdesPropertyNotFoundException.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package be.vlaanderen.informatievlaanderen.ldes.ldi.services;
-
-public class LdesPropertyNotFoundException extends RuntimeException {
-
- public LdesPropertyNotFoundException(String message) {
- super(message);
- }
-}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessorTest.java b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessorTest.java
index f6ad97a24..6719fb79a 100644
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessorTest.java
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldi/processors/LdesClientProcessorTest.java
@@ -67,8 +67,7 @@ void tearDown() {
@ArgumentsSource(MatchNumberOfFlowFilesArgumentsProvider.class)
void shouldMatchNumberOfFlowFiles(String dataSourceUrl, int numberOfRuns) {
testRunner.setProperty("DATA_SOURCE_URLS", dataSourceUrl);
- testRunner.setProperty(STATE_PERSISTENCE_STRATEGY,
- "SQLITE");
+ testRunner.setProperty(STATE_PERSISTENCE_STRATEGY, "SQLITE");
testRunner.setProperty("KEEP_STATE", Boolean.FALSE.toString());
@@ -186,7 +185,6 @@ void shouldSupportVersionMaterialisation(Map statePe
testRunner.setProperty("USE_VERSION_MATERIALISATION", Boolean.TRUE.toString());
testRunner.setProperty("USE_LATEST_STATE_FILTER", Boolean.FALSE.toString());
testRunner.setProperty("RESTRICT_TO_MEMBERS", Boolean.FALSE.toString());
- testRunner.setProperty("VERSION_OF_PROPERTY", VERSION_OF);
testRunner.run();
@@ -205,8 +203,7 @@ void shouldSupportVersionMaterialisation(Map statePe
@Test
void when_runningLdesClientWithStreamPropertiesFlags_expectsLdesPropertiesInFlowFile() {
- testRunner.setProperty("DATA_SOURCE_URLS",
- "http://localhost:10101/exampleData?scenario=gml-data");
+ testRunner.setProperty("DATA_SOURCE_URLS", "http://localhost:10101/exampleData?scenario=gml-data");
testRunner.setProperty("STREAM_SHAPE_PROPERTY", Boolean.TRUE.toString());
testRunner.run();
@@ -250,7 +247,6 @@ void shouldSupportOnlyOnceFilter(Map statePersistenc
testRunner.setProperty("KEEP_STATE", Boolean.FALSE.toString());
testRunner.setProperty("USE_EXACTLY_ONCE_FILTER", Boolean.TRUE.toString());
testRunner.setProperty("RESTRICT_TO_MEMBERS", Boolean.FALSE.toString());
- testRunner.setProperty("VERSION_OF_PROPERTY", VERSION_OF);
testRunner.run(4);
@@ -275,7 +271,6 @@ void shouldSupportDisableOfOnlyOnceFilter(Map stateP
testRunner.setProperty("KEEP_STATE", Boolean.FALSE.toString());
testRunner.setProperty("USE_EXACTLY_ONCE_FILTER", Boolean.FALSE.toString());
testRunner.setProperty("RESTRICT_TO_MEMBERS", Boolean.FALSE.toString());
- testRunner.setProperty("VERSION_OF_PROPERTY", VERSION_OF);
testRunner.run(4);
@@ -303,7 +298,6 @@ void shouldSupportVersionMaterialisationWithLatestStateFilter(Map properties;
+
+
+ public TestProcessContext(boolean useExactlyOnceFilter, boolean useVersionMaterialisation, boolean useLatestStateFilter) {
+ properties = Map.of(
+ LdesProcessorProperties.USE_EXACTLY_ONCE_FILTER, String.valueOf(useExactlyOnceFilter),
+ LdesProcessorProperties.USE_VERSION_MATERIALISATION, String.valueOf(useVersionMaterialisation),
+ LdesProcessorProperties.USE_LATEST_STATE_FILTER, String.valueOf(useLatestStateFilter),
+ PersistenceProperties.KEEP_STATE, String.valueOf(false),
+ PersistenceProperties.STATE_PERSISTENCE_STRATEGY, "MEMORY"
+ );
+ }
+
+ public TestProcessContext(boolean useExactlyOnce) {
+ this(useExactlyOnce, false, false);
+ }
+
+ public TestProcessContext(boolean useVersionMaterialisation, boolean useLatestStateFilter) {
+ this(false, useVersionMaterialisation, useLatestStateFilter);
+ }
+
+ @Override
+ public PropertyValue getProperty(PropertyDescriptor descriptor) {
+ return new MockPropertyValue(properties.get(descriptor));
+ }
+}
diff --git a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/resources/mappings/200-response-with-indirect-url.json b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/resources/mappings/200-response-with-indirect-url.json
index 73e9c0ad2..53cd63763 100644
--- a/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/resources/mappings/200-response-with-indirect-url.json
+++ b/ldi-nifi/ldi-nifi-processors/ldes-client-processor/src/test/resources/mappings/200-response-with-indirect-url.json
@@ -5,8 +5,7 @@
},
"response": {
"status": 200,
- "jsonBody":
- [
+ "jsonBody": [
{
"@context": {
"tree": "https://w3id.org/tree#",
@@ -48,6 +47,12 @@
},
"@id": "http://localhost:10101/feed",
"@type": "ldes:EventStream",
+ "ldes:versionOfPath": {
+ "@id": "dc:isVersionOf"
+ },
+ "ldes:timestampPath": {
+ "@id": "dc:modified"
+ },
"tree:view": {
"@id": "http://localhost:10101/feed?page=2023",
"ldes:retentionPolicy": {
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client-connector/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConnectorAutoConfig.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client-connector/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConnectorAutoConfig.java
index 668619b83..e2cfd6150 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client-connector/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConnectorAutoConfig.java
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client-connector/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConnectorAutoConfig.java
@@ -10,21 +10,21 @@
import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClient;
import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientConnectorApi;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldio.event.LdesClientConnectorApiCreatedEvent;
import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusConsumer;
import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusService;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioInput;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioInputConfigurator;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioObserver;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
import io.micrometer.observation.ObservationRegistry;
+import ldes.client.eventstreamproperties.EventStreamPropertiesFetcher;
import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import static be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.persistence.PersistenceProperties.KEEP_STATE;
-
@SuppressWarnings("java:S6830")
@Configuration
@@ -60,6 +60,7 @@ public LdioClientConnectorConfigurator(ApplicationEventPublisher eventPublisher,
@Override
public LdioInput configure(LdiAdapter adapter, ComponentExecutor executor, ApplicationEventPublisher applicationEventPublisher, ComponentProperties properties) {
final String pipelineName = properties.getPipelineName();
+ final LdioLdesClientProperties ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(properties);
final var connectorTransferUrl = properties.getProperty(CONNECTOR_TRANSFER_URL);
final var transferService = new MemoryTransferService(baseRequestExecutor, connectorTransferUrl);
final var memoryTokenServiceLifecycle = new MemoryTokenServiceLifecycle();
@@ -68,10 +69,16 @@ public LdioInput configure(LdiAdapter adapter, ComponentExecutor executor, Appli
final var urlProxy = getEdcUrlProxy(properties);
final var edcRequestExecutor = requestExecutorFactory.createEdcExecutor(baseRequestExecutor, tokenService,
urlProxy);
+ final var eventStreamPropertiesFetcher = new EventStreamPropertiesFetcher(edcRequestExecutor);
final var clientStatusConsumer = new ClientStatusConsumer(pipelineName, clientStatusService);
- final MemberSupplier memberSupplier = new MemberSupplierFactory(properties, edcRequestExecutor,
- clientStatusConsumer).getMemberSupplier();
- final boolean keepState = properties.getOptionalBoolean(KEEP_STATE).orElse(false);
+ final MemberSupplier memberSupplier = new MemberSupplierFactory(
+ ldioLdesClientProperties,
+ eventStreamPropertiesFetcher,
+ edcRequestExecutor,
+ clientStatusConsumer
+ ).getMemberSupplier();
+
+ final boolean keepState = ldioLdesClientProperties.isKeepStateEnabled();
final LdioObserver ldioObserver = LdioObserver.register(NAME, pipelineName, observationRegistry);
final var ldesClient = new LdioLdesClient(executor, ldioObserver, memberSupplier,
applicationEventPublisher, keepState, clientStatusConsumer);
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/pom.xml b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/pom.xml
index 2aa2a57f4..9d51c4456 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/pom.xml
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/pom.xml
@@ -66,7 +66,10 @@
be.vlaanderen.informatievlaanderen.ldes.ldi
ldi-infra-sql
- ${project.version}
+
+
+ be.vlaanderen.informatievlaanderen.ldes.client
+ event-stream-properties-fetcher
org.springdoc
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientProperties.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientProperties.java
index 98093452d..22686eb3c 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientProperties.java
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientProperties.java
@@ -1,20 +1,100 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.exception.ConfigPropertyMissingException;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.exception.InvalidConfigException;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientPropertyKeys.*;
+import static be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.persistence.PersistenceProperties.KEEP_STATE;
+
public class LdioLdesClientProperties {
+ private static final Logger log = LoggerFactory.getLogger(LdioLdesClientProperties.class);
+ public static final boolean DEFAULT_KEEP_STATE = false;
+ public static final boolean DEFAULT_USE_LATEST_STATE_FILTER = true;
+ public static final boolean DEFAULT_EXACTLY_ONCE_ENABLED = true;
+ private final ComponentProperties properties;
- private LdioLdesClientProperties() {
+ private LdioLdesClientProperties(ComponentProperties properties) {
+ this.properties = properties;
}
- // general properties
- public static final String URLS = "urls";
- public static final String SOURCE_FORMAT = "source-format";
+ public List getUrls() {
+ final List urls = properties.getPropertyList(URLS);
+ if (urls.isEmpty()) {
+ throw new ConfigPropertyMissingException(properties.getPipelineName(), properties.getComponentName(), "urls");
+ }
+ return urls;
+ }
- public static final String TIMESTAMP_PATH_PROP = "timestamp-path";
- public static final String USE_EXACTLY_ONCE_FILTER = "enable-exactly-once";
+ public String getFirstUrl() {
+ return getUrls().getFirst();
+ }
- // version materialisation properties
- public static final String USE_VERSION_MATERIALISATION = "materialisation.enabled";
- public static final String VERSION_OF_PROPERTY = "materialisation.version-of-property";
- public static final String USE_LATEST_STATE_FILTER = "materialisation.enable-latest-state";
+ public Lang getSourceFormat() {
+ return properties.getOptionalProperty(SOURCE_FORMAT)
+ .map(RDFLanguages::nameToLang)
+ .orElse(DEFAULT_SOURCE_FORMAT);
+ }
+ public boolean isExactlyOnceEnabled() {
+ return properties.getOptionalBoolean(USE_EXACTLY_ONCE_FILTER)
+ .or(() -> getOptionalVersionMaterialisationBoolean().map(isEnabled -> !isEnabled))
+ .orElse(DEFAULT_EXACTLY_ONCE_ENABLED);
+ }
+
+ public boolean isVersionMaterialisationEnabled() {
+ return getOptionalVersionMaterialisationBoolean().orElse(false);
+ }
+
+ private Optional getOptionalVersionMaterialisationBoolean() {
+ return properties.getOptionalBoolean(USE_VERSION_MATERIALISATION);
+ }
+
+ public boolean isKeepStateEnabled() {
+ return properties.getOptionalBoolean(KEEP_STATE).orElse(DEFAULT_KEEP_STATE);
+ }
+
+ public boolean isLatestStateEnabled() {
+ return properties.getOptionalBoolean(USE_LATEST_STATE_FILTER).orElse(DEFAULT_USE_LATEST_STATE_FILTER);
+ }
+
+ public ComponentProperties getProperties() {
+ return properties;
+ }
+
+ public static LdioLdesClientProperties fromComponentProperties(ComponentProperties properties) {
+ final LdioLdesClientProperties clientProps = new LdioLdesClientProperties(properties);
+ warnWhenVersionMaterialisationIsNotEnabled(clientProps);
+ checkIfBothVersionMaterialisationAndExactlyOnceAreExplicitlyEnabled(clientProps);
+ warnIfExactlyOnceFilterMustBeDisabled(clientProps);
+ return clientProps;
+ }
+
+ private static void warnWhenVersionMaterialisationIsNotEnabled(LdioLdesClientProperties clientProps) {
+ if(clientProps.getOptionalVersionMaterialisationBoolean().isEmpty()) {
+ log.atWarn().log("""
+ Version-materialization in the LDES Client hasn’t been turned on. Please note that in the future, \
+ this will be the default output of the LDES Client \
+ and having version-objects as output will have to be configured explicitly.""");
+ }
+ }
+
+ private static void checkIfBothVersionMaterialisationAndExactlyOnceAreExplicitlyEnabled(LdioLdesClientProperties clientProps) {
+ if(clientProps.isVersionMaterialisationEnabled() && clientProps.isExactlyOnceEnabled()) {
+ throw new InvalidConfigException("The exactly once filter can not be enabled with version materialisation.");
+ }
+ }
+
+ private static void warnIfExactlyOnceFilterMustBeDisabled(LdioLdesClientProperties clientProps) {
+ if(clientProps.isExactlyOnceEnabled()) {
+ log.warn("The exactly once filter can not be used while version materialisation is active, disabling filter");
+ }
+ }
}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertyKeys.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertyKeys.java
new file mode 100644
index 000000000..d5f9b72f1
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertyKeys.java
@@ -0,0 +1,19 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio;
+
+import org.apache.jena.riot.Lang;
+
+public class LdioLdesClientPropertyKeys {
+ private LdioLdesClientPropertyKeys() {
+ }
+
+ // general properties
+ public static final String URLS = "urls";
+ public static final String SOURCE_FORMAT = "source-format";
+ public static final Lang DEFAULT_SOURCE_FORMAT = Lang.TURTLE;
+
+ public static final String USE_EXACTLY_ONCE_FILTER = "enable-exactly-once";
+
+ // version materialisation properties
+ public static final String USE_VERSION_MATERIALISATION = "materialisation.enabled";
+ public static final String USE_LATEST_STATE_FILTER = "materialisation.enable-latest-state";
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientAutoConfig.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientAutoConfig.java
index 6e0189ef5..5e6e93a95 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientAutoConfig.java
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientAutoConfig.java
@@ -1,24 +1,12 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorFactory;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.services.ComponentExecutor;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClient;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusConsumer;
import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusService;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioInput;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioInputConfigurator;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioObserver;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.requestexecutor.LdioRequestExecutorSupplier;
import io.micrometer.observation.ObservationRegistry;
-import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
-import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import static be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.persistence.PersistenceProperties.KEEP_STATE;
-
@Configuration
public class LdioLdesClientAutoConfig {
@SuppressWarnings("java:S6830")
@@ -27,35 +15,4 @@ public LdioInputConfigurator ldioConfigurator(ClientStatusService clientStatusSe
return new LdioLdesClientConfigurator(clientStatusService, observationRegistry);
}
- public static class LdioLdesClientConfigurator implements LdioInputConfigurator {
- private final ClientStatusService clientStatusService;
-
- private final ObservationRegistry observationRegistry;
-
- public LdioLdesClientConfigurator(ClientStatusService clientStatusService, ObservationRegistry observationRegistry) {
- this.clientStatusService = clientStatusService;
- this.observationRegistry = observationRegistry;
- }
-
- @Override
- public LdioInput configure(LdiAdapter adapter, ComponentExecutor componentExecutor,
- ApplicationEventPublisher applicationEventPublisher,
- ComponentProperties properties) {
- String pipelineName = properties.getPipelineName();
- final var requestExecutorFactory = new RequestExecutorFactory(false);
- final var requestExecutor = new LdioRequestExecutorSupplier(requestExecutorFactory).getRequestExecutor(properties);
- final var clientStatusConsumer = new ClientStatusConsumer(pipelineName, clientStatusService);
- final MemberSupplier memberSupplier = new MemberSupplierFactory(properties, requestExecutor, clientStatusConsumer).getMemberSupplier();
- final boolean keepState = properties.getOptionalBoolean(KEEP_STATE).orElse(false);
- final LdioObserver ldioObserver = LdioObserver.register(LdioLdesClient.NAME, pipelineName, observationRegistry);
- final var ldesClient = new LdioLdesClient(componentExecutor, ldioObserver, memberSupplier, applicationEventPublisher, keepState, clientStatusConsumer);
- ldesClient.start();
- return ldesClient;
- }
-
- @Override
- public boolean isAdapterRequired() {
- return false;
- }
- }
}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConfigurator.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConfigurator.java
new file mode 100644
index 000000000..351306773
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientConfigurator.java
@@ -0,0 +1,52 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorFactory;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.services.ComponentExecutor;
+import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClient;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusConsumer;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusService;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioInput;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioInputConfigurator;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.LdioObserver;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.requestexecutor.LdioRequestExecutorSupplier;
+import io.micrometer.observation.ObservationRegistry;
+import ldes.client.eventstreamproperties.EventStreamPropertiesFetcher;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import org.springframework.context.ApplicationEventPublisher;
+
+public class LdioLdesClientConfigurator implements LdioInputConfigurator {
+ private final ClientStatusService clientStatusService;
+ private final ObservationRegistry observationRegistry;
+ private final LdioRequestExecutorSupplier requestExecutorSupplier;
+
+ public LdioLdesClientConfigurator(ClientStatusService clientStatusService, ObservationRegistry observationRegistry) {
+ this.clientStatusService = clientStatusService;
+ this.observationRegistry = observationRegistry;
+ requestExecutorSupplier = new LdioRequestExecutorSupplier(new RequestExecutorFactory(false));
+ }
+
+ @Override
+ public LdioInput configure(LdiAdapter adapter, ComponentExecutor componentExecutor,
+ ApplicationEventPublisher applicationEventPublisher,
+ ComponentProperties properties) {
+ final String pipelineName = properties.getPipelineName();
+ final LdioLdesClientProperties ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(properties);
+ final var requestExecutor = requestExecutorSupplier.getRequestExecutor(properties);
+ final EventStreamPropertiesFetcher eventStreamPropertiesFetcher = new EventStreamPropertiesFetcher(requestExecutor);
+ final var clientStatusConsumer = new ClientStatusConsumer(pipelineName, clientStatusService);
+ final MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, eventStreamPropertiesFetcher, requestExecutor, clientStatusConsumer).getMemberSupplier();
+ final boolean keepState = ldioLdesClientProperties.isKeepStateEnabled();
+ final LdioObserver ldioObserver = LdioObserver.register(LdioLdesClient.NAME, pipelineName, observationRegistry);
+ final var ldesClient = new LdioLdesClient(componentExecutor, ldioObserver, memberSupplier, applicationEventPublisher, keepState, clientStatusConsumer);
+ ldesClient.start();
+ return ldesClient;
+ }
+
+ @Override
+ public boolean isAdapterRequired() {
+ return false;
+ }
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactory.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactory.java
index 0783a68f4..f1de9ca78 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactory.java
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactory.java
@@ -1,156 +1,60 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.VersionMaterialiser;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.timestampextractor.TimestampExtractor;
-import be.vlaanderen.informatievlaanderen.ldes.ldi.timestampextractor.TimestampFromCurrentTimeExtractor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.timestampextractor.TimestampFromPathExtractor;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.exception.ConfigPropertyMissingException;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.exception.InvalidConfigException;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers.MemberSupplierWrappersBuilder;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusConsumer;
+import ldes.client.eventstreamproperties.EventStreamPropertiesFetcher;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.eventstreamproperties.valueobjects.PropertiesRequest;
import ldes.client.treenodesupplier.TreeNodeProcessor;
-import ldes.client.treenodesupplier.domain.valueobject.ClientStatus;
import ldes.client.treenodesupplier.domain.valueobject.LdesMetaData;
import ldes.client.treenodesupplier.domain.valueobject.StatePersistence;
-import ldes.client.treenodesupplier.filters.ExactlyOnceFilter;
-import ldes.client.treenodesupplier.filters.LatestStateFilter;
-import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
import ldes.client.treenodesupplier.membersuppliers.MemberSupplierImpl;
-import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
-import org.apache.jena.rdf.model.Property;
-import org.apache.jena.rdf.model.ResourceFactory;
-import org.apache.jena.riot.Lang;
-import org.apache.jena.riot.RDFLanguages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Consumer;
-
-import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties.*;
-import static be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.persistence.PersistenceProperties.KEEP_STATE;
import static org.apache.jena.rdf.model.ResourceFactory.createProperty;
public class MemberSupplierFactory {
-
- private static final String DEFAULT_VERSION_OF_KEY = "http://purl.org/dc/terms/isVersionOf";
- private static final String DEFAULT_TIMESTAMP_KEY = "http://www.w3.org/ns/prov#generatedAtTime";
-
- private final Logger log = LoggerFactory.getLogger(MemberSupplierFactory.class);
-
- private final ComponentProperties properties;
- private final RequestExecutor requestExecutor;
- private final Consumer clientStatusConsumer;
-
- public MemberSupplierFactory(ComponentProperties properties, RequestExecutor requestExecutor,
- Consumer clientStatusConsumer) {
- this.properties = properties;
- this.requestExecutor = requestExecutor;
- this.clientStatusConsumer = clientStatusConsumer;
- }
-
- public MemberSupplier getMemberSupplier() {
- log.info("Starting LdesClientRunner run setup");
- log.info("LdesClientRunner setup finished");
- MemberSupplier baseMemberSupplier =
- new MemberSupplierImpl(getTreeNodeProcessor(), getKeepState());
- if (useExactlyOnceFilter()) {
- return new FilteredMemberSupplier(baseMemberSupplier, getExactlyOnceFilter());
- } else if (useVersionMaterialisation()) {
- MemberSupplier decoratedMemberSupplier = useLatestStateFilter()
- ? new FilteredMemberSupplier(baseMemberSupplier, getLatestStateFilter())
- : baseMemberSupplier;
- return new VersionMaterialisedMemberSupplier(decoratedMemberSupplier, createVersionMaterialiser());
- } else {
- return baseMemberSupplier;
- }
- }
-
- private LatestStateFilter getLatestStateFilter() {
- String timestampPath = properties.getOptionalProperty(TIMESTAMP_PATH_PROP).orElse(DEFAULT_TIMESTAMP_KEY);
- String versionOfPath = properties.getOptionalProperty(VERSION_OF_PROPERTY).orElse(DEFAULT_VERSION_OF_KEY);
- return new LatestStateFilter(getStatePersistence().getMemberVersionRepository(), getKeepState(), timestampPath, versionOfPath);
- }
-
- private ExactlyOnceFilter getExactlyOnceFilter() {
- return new ExactlyOnceFilter(getStatePersistence().getMemberIdRepository(), getKeepState());
- }
-
- private TreeNodeProcessor getTreeNodeProcessor() {
- List targetUrls = properties.getPropertyList(URLS);
-
- if (targetUrls.isEmpty()) {
- throw new ConfigPropertyMissingException(properties.getPipelineName(), properties.getComponentName(), "urls");
- }
-
- Lang sourceFormat = getSourceFormat();
- LdesMetaData ldesMetaData = new LdesMetaData(targetUrls, sourceFormat);
- TimestampExtractor timestampExtractor = properties.getOptionalProperty(TIMESTAMP_PATH_PROP)
- .map(timestampPath -> (TimestampExtractor) new TimestampFromPathExtractor(createProperty(timestampPath)))
- .orElseGet(TimestampFromCurrentTimeExtractor::new);
-
- return new TreeNodeProcessor(ldesMetaData, getStatePersistence(), requestExecutor, timestampExtractor, clientStatusConsumer);
- }
-
- private StatePersistence getStatePersistence() {
- return new StatePersistenceFactory().getStatePersistence(properties);
- }
-
- private Boolean getKeepState() {
- return properties.getOptionalBoolean(KEEP_STATE).orElse(false);
- }
-
- private boolean useVersionMaterialisation() {
- return properties
- .getOptionalBoolean(USE_VERSION_MATERIALISATION)
- .orElseGet(() -> {
- log.warn("Version-materialization in the LDES Client hasn’t been turned on. " +
- "Please note that in the future, this will be the default output of the LDES Client " +
- "and having version-objects as output will have to be configured explicitly.");
- return false;
- });
- }
-
- private boolean useExactlyOnceFilter() {
- Optional exactlyOneFilterProperty = properties.getOptionalBoolean(USE_EXACTLY_ONCE_FILTER);
- if (exactlyOneFilterProperty.isPresent()) {
- // use filter is explicitly set
- boolean useFilter = exactlyOneFilterProperty.get();
- if (useVersionMaterialisation() && useFilter) {
- throw new InvalidConfigException("The exactly once filter can not be enabled with version materialisation.");
- } else {
- return useFilter;
- }
- } else {
- // use filter is not explicitly set
- if (useVersionMaterialisation()) {
- log.warn("The exactly once filter can not be used while version materialisation is active, disabling filter");
- return false;
- } else {
- return true;
- }
- }
- }
-
- private Lang getSourceFormat() {
- return properties.getOptionalProperty(SOURCE_FORMAT)
- .map(RDFLanguages::nameToLang)
- .orElse(Lang.TURTLE);
- }
-
- private VersionMaterialiser createVersionMaterialiser() {
- final Property versionOfProperty = properties
- .getOptionalProperty(VERSION_OF_PROPERTY)
- .map(ResourceFactory::createProperty)
- .orElseGet(() -> createProperty(DEFAULT_VERSION_OF_KEY));
- return new VersionMaterialiser(versionOfProperty, false);
- }
-
- private boolean useLatestStateFilter() {
- return properties.getOptionalBoolean(USE_LATEST_STATE_FILTER).orElse(true);
- }
-
+ private static final Logger log = LoggerFactory.getLogger(MemberSupplierFactory.class);
+ private final LdioLdesClientProperties clientProperties;
+ private final RequestExecutor requestExecutor;
+ private final ClientStatusConsumer clientStatusConsumer;
+ private final EventStreamPropertiesFetcher eventStreamPropertiesFetcher;
+
+ public MemberSupplierFactory(LdioLdesClientProperties clientProperties,
+ EventStreamPropertiesFetcher eventStreamPropertiesFetcher,
+ RequestExecutor requestExecutor,
+ ClientStatusConsumer clientStatusConsumer ) {
+ this.clientProperties = clientProperties;
+ this.requestExecutor = requestExecutor;
+ this.clientStatusConsumer = clientStatusConsumer;
+ this.eventStreamPropertiesFetcher = eventStreamPropertiesFetcher;
+ }
+
+ public MemberSupplier getMemberSupplier() {
+ log.info("Starting LdesClientRunner run setup");
+ final EventStreamProperties eventStreamProperties = eventStreamPropertiesFetcher.fetchEventStreamProperties(new PropertiesRequest(clientProperties.getFirstUrl(), clientProperties.getSourceFormat()));
+ MemberSupplier baseMemberSupplier = new MemberSupplierImpl(getTreeNodeProcessor(eventStreamProperties), clientProperties.isKeepStateEnabled());
+ baseMemberSupplier = new MemberSupplierWrappersBuilder()
+ .withEventStreamProperties(eventStreamProperties)
+ .withLdioLdesClientProperties(clientProperties)
+ .build()
+ .wrapMemberSupplier(baseMemberSupplier);
+
+ log.info("LdesClientRunner setup finished");
+ return baseMemberSupplier;
+ }
+
+ private TreeNodeProcessor getTreeNodeProcessor(EventStreamProperties eventStreamProperties) {
+ final StatePersistence statePersistence = new StatePersistenceFactory().getStatePersistence(clientProperties.getProperties());
+ LdesMetaData ldesMetaData = new LdesMetaData(clientProperties.getUrls(), clientProperties.getSourceFormat());
+ TimestampExtractor timestampExtractor = new TimestampFromPathExtractor(createProperty(eventStreamProperties.getTimestampPath()));
+ return new TreeNodeProcessor(ldesMetaData, statePersistence, requestExecutor, timestampExtractor, clientStatusConsumer);
+ }
}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/ExactlyOnceMemberSupplierWrapper.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/ExactlyOnceMemberSupplierWrapper.java
new file mode 100644
index 000000000..530795812
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/ExactlyOnceMemberSupplierWrapper.java
@@ -0,0 +1,33 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.config.StatePersistenceFactory;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrapper;
+import ldes.client.treenodesupplier.domain.valueobject.StatePersistence;
+import ldes.client.treenodesupplier.filters.ExactlyOnceFilter;
+import ldes.client.treenodesupplier.filters.MemberFilter;
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+
+public class ExactlyOnceMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final LdioLdesClientProperties properties;
+
+ public ExactlyOnceMemberSupplierWrapper(LdioLdesClientProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean shouldBeWrapped() {
+ return properties.isExactlyOnceEnabled();
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return new FilteredMemberSupplier(memberSupplier, createExactlyOnceFilter());
+ }
+
+ private MemberFilter createExactlyOnceFilter() {
+ final StatePersistence statePersistence = new StatePersistenceFactory().getStatePersistence(properties.getProperties());
+ return new ExactlyOnceFilter(statePersistence.getMemberIdRepository(), properties.isKeepStateEnabled());
+ }
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/LatestStateMemberSupplierWrapper.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/LatestStateMemberSupplierWrapper.java
new file mode 100644
index 000000000..19ab7864e
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/LatestStateMemberSupplierWrapper.java
@@ -0,0 +1,36 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.config.StatePersistenceFactory;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrapper;
+import ldes.client.treenodesupplier.domain.valueobject.StatePersistence;
+import ldes.client.treenodesupplier.filters.LatestStateFilter;
+import ldes.client.treenodesupplier.filters.MemberFilter;
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+
+public class LatestStateMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final EventStreamProperties eventStreamProperties;
+ private final LdioLdesClientProperties ldioLdesClientProperties;
+
+ public LatestStateMemberSupplierWrapper(EventStreamProperties eventStreamProperties, LdioLdesClientProperties ldioLdesClientProperties) {
+ this.eventStreamProperties = eventStreamProperties;
+ this.ldioLdesClientProperties = ldioLdesClientProperties;
+ }
+
+ @Override
+ public boolean shouldBeWrapped() {
+ return ldioLdesClientProperties.isVersionMaterialisationEnabled() && ldioLdesClientProperties.isLatestStateEnabled();
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return new FilteredMemberSupplier(memberSupplier, createLatestStateFilter());
+ }
+
+ private MemberFilter createLatestStateFilter() {
+ final StatePersistence statePersistence = new StatePersistenceFactory().getStatePersistence(ldioLdesClientProperties.getProperties());
+ return new LatestStateFilter(statePersistence.getMemberVersionRepository(), ldioLdesClientProperties.isKeepStateEnabled(), eventStreamProperties.getTimestampPath(), eventStreamProperties.getVersionOfPath());
+ }
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/MemberSupplierWrappersBuilder.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/MemberSupplierWrappersBuilder.java
new file mode 100644
index 000000000..dbbe7b486
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/MemberSupplierWrappersBuilder.java
@@ -0,0 +1,30 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrappers;
+
+import java.util.List;
+
+public class MemberSupplierWrappersBuilder implements MemberSupplierWrappers.Builder {
+ private EventStreamProperties eventStreamProperties;
+ private LdioLdesClientProperties ldioLdesClientProperties;
+
+ public MemberSupplierWrappersBuilder withEventStreamProperties(EventStreamProperties eventStreamProperties) {
+ this.eventStreamProperties = eventStreamProperties;
+ return this;
+ }
+
+ public MemberSupplierWrappersBuilder withLdioLdesClientProperties(LdioLdesClientProperties ldioLdesClientProperties) {
+ this.ldioLdesClientProperties = ldioLdesClientProperties;
+ return this;
+ }
+
+ public MemberSupplierWrappers build() {
+ return new MemberSupplierWrappers(List.of(
+ new ExactlyOnceMemberSupplierWrapper(ldioLdesClientProperties),
+ new LatestStateMemberSupplierWrapper(eventStreamProperties, ldioLdesClientProperties),
+ new VersionMaterialisedMemberSupplierWrapper(eventStreamProperties, ldioLdesClientProperties)
+ ));
+ }
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/VersionMaterialisedMemberSupplierWrapper.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/VersionMaterialisedMemberSupplierWrapper.java
new file mode 100644
index 000000000..eff99ebe7
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/wrappers/VersionMaterialisedMemberSupplierWrapper.java
@@ -0,0 +1,35 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldi.VersionMaterialiser;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.domain.services.MemberSupplierWrapper;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
+import org.apache.jena.rdf.model.Property;
+import org.apache.jena.rdf.model.ResourceFactory;
+
+public class VersionMaterialisedMemberSupplierWrapper extends MemberSupplierWrapper {
+ private final EventStreamProperties eventStreamProperties;
+ private final LdioLdesClientProperties clientProperties;
+
+ public VersionMaterialisedMemberSupplierWrapper(EventStreamProperties eventStreamProperties, LdioLdesClientProperties clientProperties) {
+ this.eventStreamProperties = eventStreamProperties;
+ this.clientProperties = clientProperties;
+ }
+
+ @Override
+ public boolean shouldBeWrapped() {
+ return clientProperties.isVersionMaterialisationEnabled();
+ }
+
+ @Override
+ protected MemberSupplier createWrappedMemberSupplier(MemberSupplier memberSupplier) {
+ return new VersionMaterialisedMemberSupplier(memberSupplier, createVersionMaterialiser());
+ }
+
+ private VersionMaterialiser createVersionMaterialiser() {
+ final Property versionOfPath = ResourceFactory.createProperty(eventStreamProperties.getVersionOfPath());
+ return new VersionMaterialiser(versionOfPath, false);
+ }
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/management/status/ClientStatusConverter.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/management/status/ClientStatusConverter.java
new file mode 100644
index 000000000..305d75331
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/main/java/be/vlaanderen/informatievlaanderen/ldes/ldio/management/status/ClientStatusConverter.java
@@ -0,0 +1,42 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.management.status;
+
+import ldes.client.treenodesupplier.domain.valueobject.ClientStatus;
+import org.springframework.http.HttpInputMessage;
+import org.springframework.http.HttpOutputMessage;
+import org.springframework.http.MediaType;
+import org.springframework.http.converter.HttpMessageConverter;
+import org.springframework.http.converter.HttpMessageNotReadableException;
+import org.springframework.http.converter.HttpMessageNotWritableException;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+@Component
+public class ClientStatusConverter implements HttpMessageConverter {
+ @Override
+ public boolean canRead(Class> clazz, MediaType mediaType) {
+ return clazz.equals(ClientStatus.class);
+ }
+
+ @Override
+ public boolean canWrite(Class> clazz, MediaType mediaType) {
+ return clazz.equals(ClientStatus.class);
+ }
+
+ @Override
+ public List getSupportedMediaTypes() {
+ return List.of(MediaType.ALL);
+ }
+
+ @Override
+ public ClientStatus read(Class extends ClientStatus> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
+ return ClientStatus.valueOf(inputMessage.getBody().toString().toUpperCase());
+ }
+
+ @Override
+ public void write(ClientStatus clientStatus, MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
+ outputMessage.getBody().write(clientStatus.toString().getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertiesTest.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertiesTest.java
new file mode 100644
index 000000000..cb800abd3
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/LdioLdesClientPropertiesTest.java
@@ -0,0 +1,24 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.exception.InvalidConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class LdioLdesClientPropertiesTest {
+
+ @Test
+ void given_ExactlyOnceAndVersionMaterialisationAreBothExplicitlyEnabled_when_parseConfig_then_ThrowException() {
+ final ComponentProperties properties = new ComponentProperties("pipeline", "cname", Map.of(
+ LdioLdesClientPropertyKeys.USE_EXACTLY_ONCE_FILTER, String.valueOf(true),
+ LdioLdesClientPropertyKeys.USE_VERSION_MATERIALISATION, String.valueOf(true)
+ ));
+
+ assertThatThrownBy(() -> LdioLdesClientProperties.fromComponentProperties(properties))
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessage("Invalid config: \"The exactly once filter can not be enabled with version materialisation.\" .");
+ }
+}
\ No newline at end of file
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/ExactlyOnceMemberSupplierWrapperTest.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/ExactlyOnceMemberSupplierWrapperTest.java
new file mode 100644
index 000000000..70b88a4c8
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/ExactlyOnceMemberSupplierWrapperTest.java
@@ -0,0 +1,47 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers.ExactlyOnceMemberSupplierWrapper;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ExactlyOnceMemberSupplierWrapperTest {
+ @Mock
+ private ComponentProperties componentProperties;
+ @Mock
+ private LdioLdesClientProperties ldioLdesClientProperties;
+ @Mock
+ private MemberSupplier baseSupplier;
+ @InjectMocks
+ private ExactlyOnceMemberSupplierWrapper exactlyOnceMemberSupplierWrapper;
+
+ @Test
+ void given_ExactlyOnceEnabled_when_wrap_then_ReturnFilteredMemberSupplier() {
+ when(ldioLdesClientProperties.isExactlyOnceEnabled()).thenReturn(true);
+ when(ldioLdesClientProperties.getProperties()).thenReturn(componentProperties);
+
+ final MemberSupplier memberSupplier = exactlyOnceMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isInstanceOf(FilteredMemberSupplier.class);
+ }
+
+ @Test
+ void given_ExactlyOnceDisabled_when_wrap_then_ReturnBaseMemberSupplier() {
+ when(ldioLdesClientProperties.isExactlyOnceEnabled()).thenReturn(false);
+
+ final MemberSupplier memberSupplier = exactlyOnceMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isSameAs(baseSupplier);
+ }
+
+}
\ No newline at end of file
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LatestStateMemberSupplierWrapperTest.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LatestStateMemberSupplierWrapperTest.java
new file mode 100644
index 000000000..141614c34
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LatestStateMemberSupplierWrapperTest.java
@@ -0,0 +1,64 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers.LatestStateMemberSupplierWrapper;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class LatestStateMemberSupplierWrapperTest {
+ @Mock
+ private ComponentProperties componentProperties;
+ @Mock
+ private LdioLdesClientProperties ldioLdesClientProperties;
+ @Mock
+ private MemberSupplier baseSupplier;
+ private LatestStateMemberSupplierWrapper latestStateMemberSupplierWrapper;
+
+ @BeforeEach
+ void setUp() {
+ final EventStreamProperties eventStreamProperties = new EventStreamProperties("test", "test", "test", "test");
+ latestStateMemberSupplierWrapper = new LatestStateMemberSupplierWrapper(eventStreamProperties, ldioLdesClientProperties);
+ }
+
+ @Test
+ void given_LatestStateEnabled_when_wrap_then_ReturnFilteredMemberSupplier() {
+ when(ldioLdesClientProperties.isLatestStateEnabled()).thenReturn(true);
+ when(ldioLdesClientProperties.isVersionMaterialisationEnabled()).thenReturn(true);
+ when(ldioLdesClientProperties.getProperties()).thenReturn(componentProperties);
+
+ final MemberSupplier memberSupplier = latestStateMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isInstanceOf(FilteredMemberSupplier.class);
+ }
+
+ @Test
+ void given_VersionMaterialisationDisabled_when_wrap_then_ReturnBaseSupplier() {
+ when(ldioLdesClientProperties.isVersionMaterialisationEnabled()).thenReturn(false);
+ final MemberSupplier memberSupplier = latestStateMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isSameAs(baseSupplier);
+ }
+
+ @Test
+ void given_LatestStateDisabled_when_wrap_then_ReturnBaseSupplier() {
+ when(ldioLdesClientProperties.isVersionMaterialisationEnabled()).thenReturn(true);
+ when(ldioLdesClientProperties.isLatestStateEnabled()).thenReturn(false);
+ final MemberSupplier memberSupplier = latestStateMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isSameAs(baseSupplier);
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientITSteps.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientITSteps.java
index 23d46a6a1..c10495b4e 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientITSteps.java
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/LdioLdesClientITSteps.java
@@ -1,7 +1,7 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
import be.vlaanderen.informatievlaanderen.ldes.ldi.services.ComponentExecutor;
-import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientPropertyKeys;
import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusService;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
import com.github.tomakehurst.wiremock.WireMockServer;
@@ -22,13 +22,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClient.NAME;
-import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties.URLS;
+import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientPropertyKeys.URLS;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.*;
public class LdioLdesClientITSteps extends LdesClientInIT {
- private final static WireMockServer wireMockServer = new WireMockServer(options().port(10101));
+ private static final WireMockServer wireMockServer = new WireMockServer(options().port(10101));
private final String pipelineName = "pipelineName";
private final ApplicationEventPublisher applicationEventPublisher = applicationEventPublisher();
private final Map componentPropsMap = new HashMap<>();
@@ -52,7 +52,7 @@ public void iWantToFollowTheFollowingLDES(List urls) {
@And("I configure this to be of RDF format {string}")
public void iConfigureThisToBeOfRDFFormat(String contentType) {
- componentPropsMap.put(LdioLdesClientProperties.SOURCE_FORMAT, contentType);
+ componentPropsMap.put(LdioLdesClientPropertyKeys.SOURCE_FORMAT, contentType);
}
@When("^I start an ldes-ldio-in component")
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactoryTest.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactoryTest.java
index 6b8968578..366f2a990 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactoryTest.java
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/MemberSupplierFactoryTest.java
@@ -1,8 +1,11 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldio.management.status.ClientStatusConsumer;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.creation.valueobjects.ComponentProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldio.pipeline.exception.ConfigPropertyMissingException;
+import ldes.client.eventstreamproperties.EventStreamPropertiesFetcher;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
import ldes.client.treenodesupplier.filters.LatestStateFilter;
import ldes.client.treenodesupplier.membersuppliers.FilteredMemberSupplier;
import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
@@ -10,32 +13,45 @@
import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
import java.util.HashMap;
import java.util.Map;
-import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties.*;
+import static be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientPropertyKeys.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.Mockito.mock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+@ExtendWith(MockitoExtension.class)
class MemberSupplierFactoryTest {
private Map defaultInputConfig;
- private final ClientStatusConsumer statusConsumer = mock(ClientStatusConsumer.class);
+ @Mock
+ private ClientStatusConsumer statusConsumer;
+ @Mock
+ private EventStreamPropertiesFetcher fetcher;
+ private LdioLdesClientProperties ldioLdesClientProperties;
+ private EventStreamProperties eventStreamProperties;
@BeforeEach
void setUp() {
defaultInputConfig = new HashMap<>();
defaultInputConfig.put(URLS, "http://example.org");
+ eventStreamProperties = new EventStreamProperties("http://localhost:8080/collection", "versionOf", "timestamp", "shaclUri");
}
@Test
void when_VersionMaterialisationIsEnabled_then_VersionMaterialisedMemberSupplierIsReturned() {
defaultInputConfig.put(USE_VERSION_MATERIALISATION, "true");
final var componentProperties = new ComponentProperties("pipelineName", "cName", defaultInputConfig);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+ when(fetcher.fetchEventStreamProperties(any())).thenReturn(eventStreamProperties);
- MemberSupplier memberSupplier = new MemberSupplierFactory(componentProperties, null, statusConsumer).getMemberSupplier();
+ MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer).getMemberSupplier();
assertThat(memberSupplier).isInstanceOf(VersionMaterialisedMemberSupplier.class);
}
@@ -44,16 +60,21 @@ void when_VersionMaterialisationIsEnabled_then_VersionMaterialisedMemberSupplier
void when_VersionMaterialisationAndOnlyOnceFilterAreNotEnabled_then_MemberSupplierImplIsReturned() {
defaultInputConfig.put(USE_EXACTLY_ONCE_FILTER, "false");
final var componentProperties = new ComponentProperties("pipelineName", "cName", defaultInputConfig);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+ when(fetcher.fetchEventStreamProperties(any())).thenReturn(eventStreamProperties);
- MemberSupplier memberSupplier = new MemberSupplierFactory(componentProperties, null, statusConsumer).getMemberSupplier();
+ MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer).getMemberSupplier();
assertThat(memberSupplier).isInstanceOf(MemberSupplierImpl.class);
}
+
@Test
void when_VersionMaterialisationIsNotEnabled_then_OnlyOnceMemberSupplierIsReturned() {
final var componentProperties = new ComponentProperties("pipelineName", "cName", defaultInputConfig);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+ when(fetcher.fetchEventStreamProperties(any())).thenReturn(eventStreamProperties);
- MemberSupplier memberSupplier = new MemberSupplierFactory(componentProperties, null, statusConsumer).getMemberSupplier();
+ MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer).getMemberSupplier();
assertThat(memberSupplier).isInstanceOf(FilteredMemberSupplier.class);
}
@@ -63,8 +84,10 @@ void when_LatestStateFilterIsEnabled_then_returnVersionMaterialisedMemberSupplie
defaultInputConfig.put(USE_VERSION_MATERIALISATION, "true");
defaultInputConfig.put(USE_LATEST_STATE_FILTER, "true");
final var componentProperties = new ComponentProperties("pipelineName", "cName", defaultInputConfig);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+ when(fetcher.fetchEventStreamProperties(any())).thenReturn(eventStreamProperties);
- MemberSupplier memberSupplier = new MemberSupplierFactory(componentProperties, null, statusConsumer).getMemberSupplier();
+ MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer).getMemberSupplier();
assertThat(memberSupplier)
.isInstanceOf(VersionMaterialisedMemberSupplier.class)
@@ -77,8 +100,10 @@ void when_LatestStateFilterIsDisabled_then_returnVersionMaterialisedMemberSuppli
defaultInputConfig.put(USE_VERSION_MATERIALISATION, "true");
defaultInputConfig.put(USE_LATEST_STATE_FILTER, "false");
final var componentProperties = new ComponentProperties("pipelineName", "cName", defaultInputConfig);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+ when(fetcher.fetchEventStreamProperties(any())).thenReturn(eventStreamProperties);
- MemberSupplier memberSupplier = new MemberSupplierFactory(componentProperties, null, statusConsumer).getMemberSupplier();
+ MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer).getMemberSupplier();
assertThat(memberSupplier)
.isInstanceOf(VersionMaterialisedMemberSupplier.class)
@@ -91,8 +116,10 @@ void when_LatestStateFilterIsEnabledWithoutMaterialisation_then_returnMemberSupp
defaultInputConfig.put(USE_EXACTLY_ONCE_FILTER, "false");
defaultInputConfig.put(USE_LATEST_STATE_FILTER, "true");
final var componentProperties = new ComponentProperties("pipelineName", "cName", defaultInputConfig);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+ when(fetcher.fetchEventStreamProperties(any())).thenReturn(eventStreamProperties);
- MemberSupplier memberSupplier = new MemberSupplierFactory(componentProperties, null, statusConsumer).getMemberSupplier();
+ MemberSupplier memberSupplier = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer).getMemberSupplier();
assertThat(memberSupplier).isInstanceOf(MemberSupplierImpl.class);
}
@@ -101,7 +128,9 @@ void when_LatestStateFilterIsEnabledWithoutMaterialisation_then_returnMemberSupp
void when_NoUrlsAreConfigured_then_ThrowException() {
final String expectedErrorMessage = "Pipeline \"pipelineName\": \"cName\" : Missing value for property \"urls\" .";
final var componentProperties = new ComponentProperties("pipelineName", "cName", Map.of("url", "http://localhost:8080/ldes"));
- final MemberSupplierFactory memberSupplierFactory = new MemberSupplierFactory(componentProperties, null, statusConsumer);
+ ldioLdesClientProperties = LdioLdesClientProperties.fromComponentProperties(componentProperties);
+
+ MemberSupplierFactory memberSupplierFactory = new MemberSupplierFactory(ldioLdesClientProperties, fetcher, null, statusConsumer);
assertThatThrownBy(memberSupplierFactory::getMemberSupplier)
.isInstanceOf(ConfigPropertyMissingException.class)
.hasMessage(expectedErrorMessage);
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/VersionMaterialisedMemberSupplierWrapperTest.java b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/VersionMaterialisedMemberSupplierWrapperTest.java
new file mode 100644
index 000000000..4513b5f12
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/java/be/vlaanderen/informatievlaanderen/ldes/ldio/config/VersionMaterialisedMemberSupplierWrapperTest.java
@@ -0,0 +1,49 @@
+package be.vlaanderen.informatievlaanderen.ldes.ldio.config;
+
+import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioLdesClientProperties;
+import be.vlaanderen.informatievlaanderen.ldes.ldio.config.wrappers.VersionMaterialisedMemberSupplierWrapper;
+import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
+import ldes.client.treenodesupplier.membersuppliers.MemberSupplier;
+import ldes.client.treenodesupplier.membersuppliers.VersionMaterialisedMemberSupplier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class VersionMaterialisedMemberSupplierWrapperTest {
+ @Mock
+ private LdioLdesClientProperties ldioLdesClientProperties;
+ @Mock
+ private MemberSupplier baseSupplier;
+ private VersionMaterialisedMemberSupplierWrapper versionMaterialisedMemberSupplierWrapper;
+
+ @BeforeEach
+ void setUp() {
+ final EventStreamProperties eventStreamProperties = new EventStreamProperties("test", "test", "test", "test");
+ versionMaterialisedMemberSupplierWrapper = new VersionMaterialisedMemberSupplierWrapper(eventStreamProperties, ldioLdesClientProperties);
+ }
+
+ @Test
+ void given_VersionMaterialisationEnabled_when_wrap_then_ReturnVersionMaterialisedMemberSupplier() {
+ when(ldioLdesClientProperties.isVersionMaterialisationEnabled()).thenReturn(true);
+
+ final MemberSupplier memberSupplier = versionMaterialisedMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isInstanceOf(VersionMaterialisedMemberSupplier.class);
+ }
+
+ @Test
+ void given_VersionMaterialisationDisabled_when_wrap_then_ReturnBaseMemberSupplier() {
+ when(ldioLdesClientProperties.isVersionMaterialisationEnabled()).thenReturn(false);
+
+ final MemberSupplier memberSupplier = versionMaterialisedMemberSupplierWrapper.wrapMemberSupplier(baseSupplier);
+
+ assertThat(memberSupplier).isSameAs(baseSupplier);
+ }
+
+}
\ No newline at end of file
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/__files/items-ldes.ttl b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/__files/items-ldes.ttl
new file mode 100644
index 000000000..da6967501
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/__files/items-ldes.ttl
@@ -0,0 +1,11 @@
+@prefix tree: .
+@prefix rdf: .
+@prefix ldes: .
+@prefix schema: .
+@prefix terms: .
+@prefix prov: .
+
+
+ rdf:type ldes:EventStream ;
+ ldes:timestampPath prov:generatedAtTime ;
+ ldes:versionOfPath terms:isVersionOf .
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/features/ldes-client-in.feature b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/features/ldes-client-in.feature
index 6a3617e39..edf8e9617 100644
--- a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/features/ldes-client-in.feature
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/features/ldes-client-in.feature
@@ -32,8 +32,6 @@ Feature: LdesClientIntegrationTest
And I want to add the following properties
| materialisation.enabled | true |
| materialisation.enable-latest-state | |
- | materialisation.version-of-property | http://purl.org/dc/terms/isVersionOf |
- | timestamp-path | http://www.w3.org/ns/prov#generatedAtTime |
And I configure this to be of RDF format "application/ld+json"
When I start an ldes-ldio-in component
Then All members from the stream are passed to the pipeline
diff --git a/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/mappings/items.json b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/mappings/items.json
new file mode 100644
index 000000000..8f339b456
--- /dev/null
+++ b/ldi-orchestrator/ldio-connectors/ldio-ldes-client/src/test/resources/mappings/items.json
@@ -0,0 +1,15 @@
+{
+ "request": {
+ "method": "GET",
+ "url": "/items"
+ },
+ "response": {
+ "status": 200,
+ "bodyFileName": "items-ldes.ttl",
+ "headers": {
+ "Content-Type": "text/turtle;charset=UTF-8",
+ "Cache-Control": "public, max-age=604800, immutable"
+ }
+ }
+}
+
diff --git a/pom.xml b/pom.xml
index 08ce598b1..b91ffc557 100644
--- a/pom.xml
+++ b/pom.xml
@@ -219,6 +219,11 @@
ldi-infra-sql
${project.version}
+
+ be.vlaanderen.informatievlaanderen.ldes.client
+ event-stream-properties-fetcher
+ ${project.version}
+