Skip to content

Commit

Permalink
feat: extract props instead of using config (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
jobulcke authored Oct 17, 2024
1 parent 38100f1 commit f84a3fd
Show file tree
Hide file tree
Showing 60 changed files with 1,649 additions and 425 deletions.
10 changes: 0 additions & 10 deletions docs/_ldio/ldio-inputs/ldio-ldes-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ within a fragment.
When the fragment is marked as immutable, and no members can be added anymore, the LDES Client will stop keeping track
of members processed within that fragment.

Members within a fragment can be processed in order of time based on a timestamp. The path to this timestamp needs to be
configured.
If the patch is missing, members will be processed in random order.

### Filtering

#### Exactly-once-filter
Expand Down Expand Up @@ -96,7 +92,6 @@ CPU ([source](https://www.sqlite.org/faq.html#q19)).
| _source-format_ | The 'Content-Type' that should be requested to the server | No | text/turtle | application/n-quads | Any type supported by [Apache Jena](https://jena.apache.org/documentation/io/rdf-input.html#determining-the-rdf-syntax) |
| _state_ | 'memory', 'sqlite' or 'postgres' to indicate how the state should be persisted | No | memory | sqlite | 'memory', 'sqlite' or 'postgres' |
| _keep-state_ | Indicates if the state should be persisted on shutdown (n/a for in memory states) | No | false | false | true or false |
| _timestamp-path_ | The property-path used to determine the timestamp on which the members will be ordered, and used for the `latest-state-filter` when enabled | No | N/A | http://www.w3.org/ns/prov#generatedAtTime | A property path |
| _enable-exactly-once_ | Indicates whether a member must be sent exactly once or at least once | No | true | true | true or false |

{: .note }
Expand All @@ -113,13 +108,8 @@ api
| Property | Description | Required | Default | Example | Supported values |
|:--------------------------------------|:--------------------------------------------------------------------------------------|:---------|:-------------------------------------|:-------------------------------------|:-----------------|
| _materialisation.enabled_ | Indicates if the client should return state-objects (true) or version-objects (false) | No | false | true | true or false |
| _materialisation.version-of-property_ | Property that points to the versionOfPath | No | http://purl.org/dc/terms/isVersionOf | http://purl.org/dc/terms/isVersionOf | true or false |
| _materialisation.enable-latest-state_ | Indicates whether all state or only the latest state must be sent | No | true | false | true or false |

{: .note }
Don't forgot to provide a timestamp-path in the general properties, as this property is not required, but necessary for
this filter to work properly!

{% include ldio-core/http-requester.md %}

### SQLite properties
Expand Down
16 changes: 16 additions & 0 deletions ldi-core/ldes-client/event-stream-properties-fetcher/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>be.vlaanderen.informatievlaanderen.ldes.client</groupId>
<artifactId>ldes-client</artifactId>
<version>2.10.0-SNAPSHOT</version>
</parent>

<artifactId>event-stream-properties-fetcher</artifactId>



</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ldes.client.eventstreamproperties;

import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
import ldes.client.eventstreamproperties.services.StartingNodeSpecificationFactory;
import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
import ldes.client.eventstreamproperties.valueobjects.PropertiesRequest;
import ldes.client.eventstreamproperties.valueobjects.StartingNodeSpecification;
import org.apache.jena.riot.RDFParser;

import java.io.ByteArrayInputStream;

public class EventStreamPropertiesFetcher {
private final RequestExecutor requestExecutor;

public EventStreamPropertiesFetcher(RequestExecutor requestExecutor) {
this.requestExecutor = requestExecutor;
}

public EventStreamProperties fetchEventStreamProperties(PropertiesRequest request) {
final EventStreamProperties eventStreamProperties = executePropertiesRequest(request);

if(eventStreamProperties.containsRequiredProperties()) {
return eventStreamProperties;
}

return executePropertiesRequest(request.withUrl(eventStreamProperties.getUri()));

}

private EventStreamProperties executePropertiesRequest(PropertiesRequest request) {
return requestExecutor.execute(request.createRequest()).getBody()
.map(ByteArrayInputStream::new)
.map(body -> RDFParser.source(body).lang(request.lang()).toModel())
.map(StartingNodeSpecificationFactory::fromModel)
.map(StartingNodeSpecification::extractEventStreamProperties)
.orElseThrow();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ldes.client.eventstreamproperties.services;

import ldes.client.eventstreamproperties.valueobjects.StartingNodeSpecification;
import ldes.client.eventstreamproperties.valueobjects.TreeNodeSpecification;
import ldes.client.eventstreamproperties.valueobjects.ViewSpecification;
import org.apache.jena.rdf.model.Model;

public class StartingNodeSpecificationFactory {
private StartingNodeSpecificationFactory() {
}

public static StartingNodeSpecification fromModel(Model model) {
if (TreeNodeSpecification.isTreeNode(model)) {
return new TreeNodeSpecification(model);
}
if (ViewSpecification.isViewSpecification(model)) {
return new ViewSpecification(model);
}
throw new IllegalStateException("The provided starting node must contain either a dcterms:isPartOf property or the ldes:versionOfPath and ldes:timestampPath properties");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ldes.client.eventstreamproperties.valueobjects;

public final class EventStreamProperties {
private final String uri;
private final String versionOfPath;
private final String timestampPath;
private final String shaclShapeUri;

public EventStreamProperties(String uri) {
this(uri, null, null, null);
}

public EventStreamProperties(String uri, String versionOfPath, String timestampPath, String shaclShapeUri) {
this.uri = uri;
this.versionOfPath = versionOfPath;
this.timestampPath = timestampPath;
this.shaclShapeUri = shaclShapeUri;
}

public String getUri() {
return uri;
}

public String getVersionOfPath() {
return versionOfPath;
}

public String getTimestampPath() {
return timestampPath;
}

public String getShaclShapeUri() {
return shaclShapeUri;
}

public boolean containsRequiredProperties() {
return timestampPath != null && versionOfPath != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ldes.client.eventstreamproperties.valueobjects;

import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.GetRequest;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.Request;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.RequestHeader;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.valueobjects.RequestHeaders;
import org.apache.http.HttpHeaders;
import org.apache.jena.riot.Lang;

import java.util.List;

public record PropertiesRequest(String url, Lang lang) {

public Request createRequest() {
RequestHeaders requestHeaders = new RequestHeaders(List.of(
new RequestHeader(HttpHeaders.ACCEPT, lang.getHeaderString())
));
return new GetRequest(url, requestHeaders);
}

public PropertiesRequest withUrl(String url) {
return new PropertiesRequest(url, lang);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ldes.client.eventstreamproperties.valueobjects;

public interface StartingNodeSpecification {
EventStreamProperties extractEventStreamProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ldes.client.eventstreamproperties.valueobjects;

import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.RDFNode;

import java.util.Optional;

import static org.apache.jena.rdf.model.ResourceFactory.createProperty;

public class TreeNodeSpecification implements StartingNodeSpecification {
public static final String DC_TERMS = "http://purl.org/dc/terms/";
public static final Property IS_PART_OF = createProperty(DC_TERMS, "isPartOf");
private final Model model;

public TreeNodeSpecification(Model model) {
this.model = model;
}

@Override
public EventStreamProperties extractEventStreamProperties() {
return extractTreeNode(model).filter(RDFNode::isURIResource)
.map(RDFNode::asResource)
.map(node -> new EventStreamProperties(node.getURI()))
.orElseThrow();
}

public static boolean isTreeNode(Model model) {
return extractTreeNode(model).isPresent();
}

private static Optional<RDFNode> extractTreeNode(Model model) {
return model.listObjectsOfProperty(IS_PART_OF).nextOptional();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ldes.client.eventstreamproperties.valueobjects;

import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.Resource;

import java.util.Optional;

import static org.apache.jena.rdf.model.ResourceFactory.createProperty;

public class ViewSpecification implements StartingNodeSpecification {
public static final String LDES = "https://w3id.org/ldes#";
public static final Property LDES_EVENT_STREAM = createProperty(LDES, "EventStream");
public static final String RDF_SYNTAX = "http://www.w3.org/1999/02/22-rdf-syntax-ns#";
public static final Property RDF_SYNTAX_TYPE = createProperty(RDF_SYNTAX, "type");
public static final Property TREE_SHAPE = createProperty("https://w3id.org/tree#", "shape");
public static final Property LDES_VERSION_OF_PATH = createProperty(LDES, "versionOfPath");
public static final Property LDES_TIMESTAMP_PATH = createProperty(LDES, "timestampPath");

private final Model model;

public ViewSpecification(Model model) {
this.model = model;
}

@Override
public EventStreamProperties extractEventStreamProperties() {
final Resource subject = extractEventStream(model).orElseThrow();
final String shapeUri = Optional.ofNullable(subject.getPropertyResourceValue(TREE_SHAPE))
.map(Resource::getURI)
.orElse("");
return new EventStreamProperties(
subject.getURI(),
subject.getPropertyResourceValue(LDES_VERSION_OF_PATH).getURI(),
subject.getPropertyResourceValue(LDES_TIMESTAMP_PATH).getURI(),
shapeUri
);
}

public static boolean isViewSpecification(Model model) {
return extractEventStream(model).isPresent();
}

private static Optional<Resource> extractEventStream(Model model) {
return model.listSubjectsWithProperty(RDF_SYNTAX_TYPE, LDES_EVENT_STREAM).nextOptional();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package ldes.client.eventstreamproperties;

import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorFactory;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import ldes.client.eventstreamproperties.valueobjects.EventStreamProperties;
import ldes.client.eventstreamproperties.valueobjects.PropertiesRequest;
import org.apache.jena.riot.Lang;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.assertj.core.api.Assertions.assertThat;

@WireMockTest(httpPort = EventStreamPropertiesFetcherTest.WIREMOCK_PORT)
class EventStreamPropertiesFetcherTest {
public static final int WIREMOCK_PORT = 12121;
private static final EventStreamProperties eventStreamProperties = new EventStreamProperties(
"http://localhost:12121/observations",
"http://purl.org/dc/terms/isVersionOf",
"http://www.w3.org/ns/prov#generatedAtTime",
""
);
private EventStreamPropertiesFetcher fetcher;

@BeforeEach
void setUp() {
RequestExecutor requestExecutor = new RequestExecutorFactory().createNoAuthExecutor();
fetcher = new EventStreamPropertiesFetcher(requestExecutor);
}

@Test
void given_EventStream_when_FetchProperties_then_ReturnValidProperties() throws IOException, URISyntaxException {
URL resource = getClass().getClassLoader().getResource("models/eventstream.ttl");
final byte[] responseBytes = Files.readAllBytes(Path.of(Objects.requireNonNull(resource).toURI()));
stubFor(get("/observations").willReturn(ok().withBody(responseBytes)));

final EventStreamProperties properties = fetcher.fetchEventStreamProperties(new PropertiesRequest("http://localhost:12121/observations", Lang.TTL));

verify(getRequestedFor(urlEqualTo("/observations")));
assertThat(properties)
.usingRecursiveComparison()
.isEqualTo(eventStreamProperties);
}

@Test
void given_View_when_FetchProperties_then_ReturnValidProperties() throws IOException, URISyntaxException {
URL resource = getClass().getClassLoader().getResource("models/view.ttl");
final byte[] responseBytes = Files.readAllBytes(Path.of(Objects.requireNonNull(resource).toURI()));
stubFor(get("/observations/by-page").willReturn(ok().withBody(responseBytes)));

final EventStreamProperties properties = fetcher.fetchEventStreamProperties(new PropertiesRequest("http://localhost:12121/observations/by-page", Lang.TTL));

verify(getRequestedFor(urlEqualTo("/observations/by-page")));
assertThat(properties)
.usingRecursiveComparison()
.isEqualTo(eventStreamProperties);
}

@Test
void given_Fragment_when_FetchProperties_then_ReturnValidProperties() throws IOException, URISyntaxException {
URL treeNodeResource = getClass().getClassLoader().getResource("models/treenode.ttl");
final byte[] treeNodeBody = Files.readAllBytes(Path.of(Objects.requireNonNull(treeNodeResource).toURI()));
stubFor(get("/observations/by-page?pageNumber=1").willReturn(ok().withBody(treeNodeBody)));
URL eventSourceResource = getClass().getClassLoader().getResource("models/eventstream.ttl");
final byte[] eventStreamBody = Files.readAllBytes(Path.of(Objects.requireNonNull(eventSourceResource).toURI()));
stubFor(get("/observations").willReturn(ok().withBody(eventStreamBody)));

final EventStreamProperties properties = fetcher.fetchEventStreamProperties(new PropertiesRequest("http://localhost:12121/observations/by-page?pageNumber=1", Lang.TTL));

verify(getRequestedFor(urlEqualTo("/observations/by-page?pageNumber=1")));
verify(getRequestedFor(urlEqualTo("/observations")));
assertThat(properties)
.usingRecursiveComparison()
.isEqualTo(eventStreamProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ldes.client.eventstreamproperties.services;

import org.apache.jena.rdf.model.Model;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

class StartingNodeSpecificationFactoryTest {
@Test
void given_invalidModel_when_FromModel_then_ThrowException() {
String invalid = "<http://localhost:12121/observations/by-page?pageNumber=1> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/dc/terms/Standard> .";
Model model = RDFParser.fromString(invalid).lang(Lang.NQ).toModel();

assertThatThrownBy(() -> StartingNodeSpecificationFactory.fromModel(model))
.isInstanceOf(IllegalStateException.class)
.hasMessage("The provided starting node must contain either a dcterms:isPartOf property or the ldes:versionOfPath and ldes:timestampPath properties");
}
}
Loading

0 comments on commit f84a3fd

Please sign in to comment.