Skip to content

Commit

Permalink
Feat: move ldio status to input (#510)
Browse files Browse the repository at this point in the history
Co-authored-by: Yalz <[email protected]>
  • Loading branch information
pj-cegeka and Yalz committed Feb 29, 2024
1 parent 1a38a23 commit 4a41889
Show file tree
Hide file tree
Showing 88 changed files with 1,003 additions and 366 deletions.
6 changes: 5 additions & 1 deletion docs/_ldio/ldio-inputs/ldio-amqp-in.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ an [AMQP 1.0 queue](https://www.amqp.org/resources/specifications).
password: artemis
queue: example
content-type: application/ld+json
```
```
## Pausing
When paused, this component will not receive any messages from the queue and will start syncing with the queue when unpaused.
7 changes: 6 additions & 1 deletion docs/_ldio/ldio-inputs/ldio-archive-file-in.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ Please refer to the [core documentation](../../core/ldi-inputs/file-archiving) f
| Property | Description | Required | Default | Example | Supported values |
|:-----------------|:----------------------------------|:---------|:----------------------------|:-----------------|:--------------------------------|
| archive-root-dir | The root directory of the archive | Yes | N/A | /parcels/archive | Linux (+ Mac) and Windows paths |
| source-format | The source format of the files | No | Deduced from file extension | text/turtle | Any Jena supported format |
| source-format | The source format of the files | No | Deduced from file extension | text/turtle | Any Jena supported format |

## Pausing

When paused, this component will stop reading from the archive.
When resumed, it will pick up where it left of, ignoring any changes to the file structure.
5 changes: 5 additions & 0 deletions docs/_ldio/ldio-inputs/ldio-http-in-poller.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ config:
When using multiple endpoints, the other config (auth config, interval, etc.) applies to all endpoints.
## Pausing
When paused, this component will stop making any of the scheduled HTTP-calls.
When resumed, it will restart these calls as if the component had been restarted, meaning any configured periods will start counting from the moment the pipeline was resumed instead of when it was originally created.
----
[^1]: Either choose the 'cron' option or the 'interval'. However, **the interval property will become deprecated**.
Expand Down
6 changes: 5 additions & 1 deletion docs/_ldio/ldio-inputs/ldio-http-in.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ Data can be written to ``http://{hostname}:{port}/{pipeline name}``

## Config

This component has no required config
This component has no required config

## Pausing

When paused, this component will return an 503 response to any HTTP-calls it receives
5 changes: 5 additions & 0 deletions docs/_ldio/ldio-inputs/ldio-kafka-in.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ outputs:
sasl-jaas-user: client
sasl-jaas-password: client-secret
```
## Pausing
When paused, this component will stop listening to the kafka topics.
When resumed, it will try to resync with all topics.
6 changes: 6 additions & 0 deletions docs/_ldio/ldio-inputs/ldio-ldes-client-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ input:
proxy-url-replacement: http://consumer-connector:29291/public
source-format: application/n-quads
```
## Pausing
When paused, this component will stop processing the current fragment and not make any calls to the server.
When resumed, it will continue with the fragment where it stopped and continue as normal.
This component can not be paused while waiting for the token.
10 changes: 9 additions & 1 deletion docs/_ldio/ldio-inputs/ldio-ldes-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ An LDIO wrapper component for the [LDI LDES Client building block](../../core/ld
This component uses the "LDIO Http Requester" to make the HTTP request.
Refer to [LDIO Http Requester](../ldio-core) for the config.

> **_NOTE:_** Setting the keep-state property to true makes it so that the state can not be deleted through the pipeline-management api

## Examples

```yaml
Expand Down Expand Up @@ -74,4 +77,9 @@ Refer to [LDIO Http Requester](../ldio-core) for the config.
url: jdbc:postgresql://test.postgres.database.azure.com:5432/sample
username: myUsername@test
password: myPassword
```
```
## Pausing
When paused, this component will stop processing the current fragment and not make any calls to the server.
When resumed, it will continue with the fragment where it stopped and continue as normal.
18 changes: 17 additions & 1 deletion docs/_ldio/pipeline-management/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,20 @@ orchestrator:
If this directory does not exist, it will be created.

> **_NOTE:_** An application config can be defined by creating an application YAML file in the LDIO directory
(in docker, this correlates to `/ldio/application.yml`).
(in docker, this correlates to `/ldio/application.yml`).


## Pausing & Resuming LDIO

Sometimes it might be preferred to pause an LDIO pipeline instead of deleting and recreating it.
To pause a pipeline, simply call the following endpoint:
````
{base-url}/admin/api/v1/pipeline/{pipeline-name}/halt
````
And to resume a paused pipeline:
````
{base-url}/admin/api/v1/pipeline/{pipeline-name}/resume
````

The exact behaviour of a paused pipeline depends on its input component and can be found in the [documentation of these components](docs/_ldio/ldio-inputs/index.md).
However, it will always complete its current run through the pipeline and then seize sending any output.
5 changes: 4 additions & 1 deletion docs/_ldio/pipeline-management/pipeline-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ A deletion of a pipeline can be achieved by performing a DELETE request to the

````bash
curl --location --request DELETE 'http://localhost:8080/admin/api/v1/pipeline/my-first-pipeline'
````
````

For stateful components like the Ldio ldes-client it is possible to persist the state on the removal of the pipeline.
Keeping the state is the default option. If you want to specifically delete the state simply send false in the request body of the delete statement.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.services.RequestExecutorFactory;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import ldes.client.treenoderelationsfetcher.domain.valueobjects.LdesRelation;
import ldes.client.treenoderelationsfetcher.domain.valueobjects.LdesStructure;
import org.apache.commons.io.FileUtils;
Expand All @@ -17,6 +16,8 @@
import java.util.Objects;
import java.util.stream.Stream;

import com.github.tomakehurst.wiremock.junit5.WireMockTest;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface MemberSupplier extends Supplier<SuppliedMember> {

@Override
SuppliedMember get();

void init();
void destroyState();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ public class MemberSupplierImpl implements MemberSupplier {
public MemberSupplierImpl(TreeNodeProcessor treeNodeProcessor, boolean keepState) {
this.treeNodeProcessor = treeNodeProcessor;
this.keepState = keepState;
Runtime.getRuntime().addShutdownHook(new Thread(this::destroyState));
}

@Override
Expand All @@ -25,4 +24,8 @@ public void destroyState() {
}
}

@Override
public void init() {
treeNodeProcessor.init();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ public TreeNodeProcessor(LdesMetaData ldesMetaData, StatePersistence statePersis
this.ldesMetaData = ldesMetaData;
}

public SuppliedMember getMember() {
savePreviousState();
public void init() {
if (!treeNodeRecordRepository.containsTreeNodeRecords()) {
initializeTreeNodeRecordRepository();
}
}

public SuppliedMember getMember() {
savePreviousState();

Optional<MemberRecord> unprocessedTreeMember = memberRepository.getUnprocessedTreeMember();
while (unprocessedTreeMember.isEmpty()) {
processTreeNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public void destroyState() {
memberSupplier.destroyState();
}

@Override
public void init() {
memberSupplier.init();
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package ldes.client.performance;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.responsetemplating.ResponseTemplateTransformer;
import ldes.client.performance.csvwriter.CsvFile;
import ldes.client.treenodesupplier.TreeNodeProcessor;
import org.junit.jupiter.api.*;
Expand All @@ -11,6 +8,10 @@
import java.time.temporal.ChronoUnit;
import java.util.List;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.responsetemplating.ResponseTemplateTransformer;

import static org.apache.commons.io.FilenameUtils.separatorsToSystem;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.rdf.formatter;

import com.github.jsonldjava.core.JsonLdOptions;
import org.apache.jena.atlas.json.JSON;
import org.apache.jena.atlas.json.JsonObject;
import org.apache.jena.rdf.model.Model;
Expand All @@ -10,6 +9,8 @@

import java.io.OutputStream;

import com.github.jsonldjava.core.JsonLdOptions;

import static be.vlaanderen.informatievlaanderen.ldes.ldi.rdf.formatter.PrefixAdder.addPrefixesToModel;

public class JsonLdFrameWriter implements LdiRdfWriter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.rdf.parser;

import com.apicatalog.jsonld.JsonLdOptions;
import com.apicatalog.jsonld.context.cache.LruCache;
import org.apache.jena.riot.RIOT;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;

import com.apicatalog.jsonld.JsonLdOptions;
import com.apicatalog.jsonld.context.cache.LruCache;

import static org.apache.jena.riot.lang.LangJSONLD11.JSONLD_OPTIONS;

public class JenaContextProvider {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.rdf.parser;

import org.junit.jupiter.api.Test;

import com.apicatalog.jsonld.JsonLdOptions;
import com.apicatalog.jsonld.context.cache.Cache;
import com.apicatalog.jsonld.document.Document;
import org.junit.jupiter.api.Test;

import static org.apache.jena.riot.lang.LangJSONLD11.JSONLD_OPTIONS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import be.vlaanderen.informatievlaanderen.ldes.ldi.rdf.parser.JenaContextProvider;
import be.vlaanderen.informatievlaanderen.ldes.ldi.valuobjects.properties.LinkedDataAttribute;
import be.vlaanderen.informatievlaanderen.ldes.ldi.valuobjects.valueproperties.DateTimeValue;
import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.riot.Lang;
Expand All @@ -17,6 +14,10 @@
import java.util.List;
import java.util.Objects;

import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static be.vlaanderen.informatievlaanderen.ldes.ldi.config.NgsiV2ToLdMapping.*;
import static be.vlaanderen.informatievlaanderen.ldes.ldi.services.NgsiLdURIParser.toNgsiLdUri;

Expand Down
9 changes: 5 additions & 4 deletions ldi-core/rdf-adapter/src/test/java/RdfAdapterTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import be.vlaanderen.informatievlaanderen.ldes.ldi.RdfAdapter;
import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
import com.apicatalog.jsonld.JsonLdOptions;
import com.apicatalog.jsonld.context.cache.LruCache;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParserBuilder;
Expand All @@ -12,6 +8,11 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.apicatalog.jsonld.JsonLdOptions;
import com.apicatalog.jsonld.context.cache.LruCache;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;

import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.apache.jena.riot.lang.LangJSONLD11.JSONLD_OPTIONS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.requestexecutor.executor.RequestExecutorSupplier;
import org.apache.http.Header;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

import java.util.Collection;

import com.github.scribejava.core.builder.ServiceBuilder;
import com.github.scribejava.core.builder.api.DefaultApi20;
import com.github.scribejava.core.oauth.OAuth20Service;
import com.github.scribejava.core.oauth2.clientauthentication.ClientAuthentication;
import com.github.scribejava.core.oauth2.clientauthentication.RequestBodyAuthenticationScheme;
import com.github.scribejava.httpclient.apache.ApacheHttpClient;
import org.apache.http.Header;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

import java.util.Collection;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.commons.lang3.Validate.notNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class LdesClientProcessor extends AbstractProcessor {
private final RequestExecutorFactory requestExecutorFactory = new RequestExecutorFactory();
private final StatePersistenceFactory statePersistenceFactory = new StatePersistenceFactory();
private boolean hasLdesEnded;
private boolean keepState;

@Override
public Set<Relationship> getRelationships() {
Expand Down Expand Up @@ -89,7 +90,7 @@ public void onScheduled(final ProcessContext context) {
TimestampExtractor timestampExtractor = timestampPath.isBlank() ? new TimestampFromCurrentTimeExtractor() :
new TimestampFromPathExtractor(createProperty(timestampPath));
TreeNodeProcessor treeNodeProcessor = new TreeNodeProcessor(ldesMetaData, statePersistence, requestExecutor, timestampExtractor);
boolean keepState = stateKept(context);
keepState = stateKept(context);
if (useVersionMaterialisation(context)) {
final var versionOfProperty = createProperty(getVersionOfProperty(context));
final var versionMaterialiser = new VersionMaterialiser(versionOfProperty, restrictToMembers(context));
Expand All @@ -101,6 +102,7 @@ public void onScheduled(final ProcessContext context) {
memberSupplier = new MemberSupplierImpl(treeNodeProcessor, keepState);
}

memberSupplier.init();
determineLdesProperties(ldesMetaData, requestExecutor, context);

LOGGER.info("LDES Client processor {} configured to follow (sub)streams {} (expected LDES source format: {})",
Expand Down Expand Up @@ -180,7 +182,9 @@ private void processNextMember(ProcessContext context, ProcessSession session) {

@OnRemoved
public void onRemoved() {
memberSupplier.destroyState();
if (!keepState) {
memberSupplier.destroyState();
}
}

public static String convertModelToString(Model model, Lang dataDestinationFormat) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.processors;

import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
Expand All @@ -20,6 +18,9 @@
import java.util.Map;
import java.util.stream.Stream;

import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.junit5.WireMockTest;

import static be.vlaanderen.informatievlaanderen.ldes.ldi.processors.config.LdesProcessorRelationships.DATA_RELATIONSHIP;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
Expand Down
Loading

0 comments on commit 4a41889

Please sign in to comment.