Skip to content

Commit

Permalink
fix: clean up ldi inputs ctors (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
jobulcke authored Apr 17, 2024
1 parent eb9a8a0 commit 93786c6
Show file tree
Hide file tree
Showing 22 changed files with 303 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import be.vlaanderen.informatievlaanderen.ldes.ldi.services.ComponentExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
import be.vlaanderen.informatievlaanderen.ldes.ldio.types.LdioInput;
import be.vlaanderen.informatievlaanderen.ldes.ldio.types.LdioObserver;
import org.springframework.context.ApplicationEventPublisher;

import static be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.PipelineStatusTrigger.START;
Expand All @@ -11,7 +12,7 @@ public class DummyIn extends LdioInput {
private int counter = 0;

public DummyIn(ComponentExecutor executor, LdiAdapter adapter, ApplicationEventPublisher applicationEventPublisher) {
super("DummyIn", "test", executor, adapter, null, applicationEventPublisher);
super(executor, adapter, LdioObserver.register("DummyIn", "test", null), applicationEventPublisher);
this.updateStatus(START);
}

Expand Down
7 changes: 5 additions & 2 deletions ldi-orchestrator/ldio-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.11.5</version>
<scope>compile</scope>
<version>${micrometer-prometheus.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand All @@ -39,6 +38,10 @@
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
import be.vlaanderen.informatievlaanderen.ldes.ldi.services.ComponentExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiComponent;
import be.vlaanderen.informatievlaanderen.ldes.ldio.config.ObserveConfiguration;
import be.vlaanderen.informatievlaanderen.ldes.ldio.events.PipelineStatusEvent;
import be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.PipelineStatus;
import be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.PipelineStatusTrigger;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.apache.jena.rdf.model.Model;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

import static be.vlaanderen.informatievlaanderen.ldes.ldio.config.PipelineConfig.PIPELINE_NAME;
import java.util.function.Supplier;

import static be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.PipelineStatus.*;
import static be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.PipelineStatusTrigger.START;
import static be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.StatusChangeSource.MANUAL;
Expand All @@ -28,17 +25,12 @@
* onto the LDIO pipeline
*/
public abstract class LdioInput implements LdiComponent {

protected final String componentName;
protected final String pipelineName;
private final ComponentExecutor executor;
private final LdiAdapter adapter;
private final LdioObserver ldioObserver;
private final ApplicationEventPublisher applicationEventPublisher;
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ObservationRegistry observationRegistry;

private static final String LDIO_DATA_IN = "ldio_data_in";
private static final String LDIO_COMPONENT_NAME = "ldio_type";
private PipelineStatus pipelineStatus;

/**
Expand All @@ -49,48 +41,26 @@ public abstract class LdioInput implements LdiComponent {
* @param adapter Instance of the LDI Adapter. Facilitates transforming the input
* data to a linked data model (RDF).
*/
protected LdioInput(String componentName, String pipelineName, ComponentExecutor executor, LdiAdapter adapter,
ObservationRegistry observationRegistry, ApplicationEventPublisher applicationEventPublisher) {
this.componentName = componentName;
this.pipelineName = pipelineName;
protected LdioInput(ComponentExecutor executor, LdiAdapter adapter, LdioObserver ldioObserver, ApplicationEventPublisher applicationEventPublisher) {
this.executor = executor;
this.adapter = adapter;
this.observationRegistry = observationRegistry;
this.ldioObserver = ldioObserver;
this.applicationEventPublisher = applicationEventPublisher;
this.pipelineStatus = INIT;
Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment(0);
}

public void processInput(String content, String contentType) {
processInput(LdiAdapter.Content.of(content, contentType));
}

public void processInput(LdiAdapter.Content content) {
Observation.createNotStarted(this.componentName, observationRegistry)
.observe(() -> {
try {
adapter.apply(content).forEach(this::processModel);
} catch (Exception e) {
final var errorLocation = this.pipelineName + ":processInput";
log.atError().log(ObserveConfiguration.ERROR_TEMPLATE, errorLocation, e.getMessage());
log.atError().log(ObserveConfiguration.ERROR_TEMPLATE, errorLocation,
"Processing below message.%n%n###mime###%n%s%n###content###%n%s".formatted(content.mimeType(), content.content()));
throw e;
}
});
final Supplier<String> failedContentLogSupplier = () -> "Processing below message.%n%n###mime###%n%s%n###content###%n%s".formatted(content.mimeType(), content.content());
ldioObserver.observe(() -> adapter.apply(content).forEach(this::processModel), "processInput", failedContentLogSupplier);
}

protected void processModel(Model model) {
Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment();
Observation.createNotStarted(this.componentName, observationRegistry)
.observe(() -> {
try {
executor.transformLinkedData(model);
} catch (Exception e) {
log.atError().log(ObserveConfiguration.ERROR_TEMPLATE, this.pipelineName + ":processModel", e.getMessage());
throw e;
}
});
ldioObserver.increment();
ldioObserver.observe(() -> executor.transformLinkedData(model), "processModel");
}

public abstract void shutdown();
Expand All @@ -109,10 +79,10 @@ public PipelineStatus updateStatus(PipelineStatusTrigger trigger) {
}
}
case STOP -> this.pipelineStatus = STOPPED;
default -> log.warn("Unhandled status update on pipeline: {} for status: {}", pipelineName, pipelineStatus);
default -> log.warn("Unhandled status update on pipeline: {} for status: {}", ldioObserver.getPipelineName(), pipelineStatus);
}

applicationEventPublisher.publishEvent(new PipelineStatusEvent(pipelineName, this.pipelineStatus, MANUAL));
applicationEventPublisher.publishEvent(new PipelineStatusEvent(ldioObserver.getPipelineName(), this.pipelineStatus, MANUAL));
return this.pipelineStatus;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.types;

import be.vlaanderen.informatievlaanderen.ldes.ldio.config.ObserveConfiguration;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.function.Supplier;

import static be.vlaanderen.informatievlaanderen.ldes.ldio.config.PipelineConfig.PIPELINE_NAME;

public class LdioObserver {
private static final String LDIO_DATA_IN = "ldio_data_in";
private static final String LDIO_COMPONENT_NAME = "ldio_type";
private static final Logger log = LoggerFactory.getLogger(LdioObserver.class);
private final String componentName;
private final String pipelineName;
private final ObservationRegistry observationRegistry;

private LdioObserver(String componentName, String pipelineName, ObservationRegistry observationRegistry) {
this.componentName = componentName;
this.pipelineName = pipelineName;
this.observationRegistry = observationRegistry;
}

@SafeVarargs
public final void observe(Runnable observable, String location, Supplier<String>... additionalLoggingContent) {
final String errorLocation = pipelineName + ":" + location;
Observation.createNotStarted(this.componentName, observationRegistry)
.observe(() -> {
try {
observable.run();
} catch (Exception e) {
log.atError().log(ObserveConfiguration.ERROR_TEMPLATE, errorLocation, e.getMessage());
Arrays.stream(additionalLoggingContent)
.forEach(content ->
log.atError().log(ObserveConfiguration.ERROR_TEMPLATE, errorLocation, content.get())
);
throw e;
}
});
}

public void increment() {
Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment();
}

public static LdioObserver register(String componentName, String pipelineName, ObservationRegistry observationRegistry) {
Metrics.counter(LDIO_DATA_IN, PIPELINE_NAME, pipelineName, LDIO_COMPONENT_NAME, componentName).increment(0);
return new LdioObserver(componentName, pipelineName, observationRegistry);
}

public String getPipelineName() {
return pipelineName;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.types;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.*;

class LdioObserverTest {
private static final String COMPONENT_NAME = "Ldio:ComponentName";
private static final String PIPELINE_NAME = "pipeline-name";
private LdioObserver ldioObserver;

@BeforeEach
void setUp() {
ldioObserver = LdioObserver.register(COMPONENT_NAME, PIPELINE_NAME, null);
}

@Test
void when_ObservableDoesNotThrowException_then_DoNotLog() {
final Supplier<String> additionalLogSupplier = mock();
final Runnable observable = () -> {};

ldioObserver.observe(observable, "test", additionalLogSupplier);

verifyNoInteractions(additionalLogSupplier);
}

@Test
void when_ObservableDoesThrowsException_then_Log() {
final Supplier<String> additionalLogSupplier = mock();
final Runnable observable = () -> {
throw new RuntimeException();
};

assertThatThrownBy(() -> ldioObserver.observe(observable, "test", additionalLogSupplier))
.isInstanceOf(RuntimeException.class);

verify(additionalLogSupplier).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import be.vlaanderen.informatievlaanderen.ldes.ldi.services.ComponentExecutor;
import be.vlaanderen.informatievlaanderen.ldes.ldi.types.LdiAdapter;
import be.vlaanderen.informatievlaanderen.ldes.ldio.config.JmsConfig;
import be.vlaanderen.informatievlaanderen.ldes.ldio.config.LdioAmqpInRegistrator;
import be.vlaanderen.informatievlaanderen.ldes.ldio.exceptions.InvalidAmqpMessageException;
import be.vlaanderen.informatievlaanderen.ldes.ldio.types.LdioInput;
import io.micrometer.observation.ObservationRegistry;
import be.vlaanderen.informatievlaanderen.ldes.ldio.types.LdioObserver;
import be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.LdioAmpqInProperties;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
Expand All @@ -21,30 +21,29 @@ public class LdioAmqpIn extends LdioInput implements MessageListener {
public static final String NAME = "Ldio:AmqpIn";
private static final Logger log = LoggerFactory.getLogger(LdioAmqpIn.class);
private final LdioAmqpInRegistrator ldioAmqpInRegistrator;
private final LdioAmpqInProperties properties;
private final String listenerId;
private final String defaultContentType;

/**
* Creates a LdiInput with its Component Executor and LDI Adapter
*
* @param pipelineName Unique identifier for the pipeline.
* @param executor Instance of the Component Executor. Allows the LDI Input to pass
* data on the pipeline
* @param adapter Instance of the LDI Adapter. Facilitates transforming the input
* data to a linked data model (RDF).
* @param jmsConfig Configuration class containing the necessary info to spin up a listener
* @param ldioObserver Instance of the LDIO Observer, for observation, logging and monitoring reaons
* @param jmsInRegistrator Global service to maintain JMS listeners.
* @param observationRegistry
* @param properties Configuration properties
*/
public LdioAmqpIn(String pipelineName, ComponentExecutor executor, LdiAdapter adapter,
String defaultContentType, JmsConfig jmsConfig, LdioAmqpInRegistrator jmsInRegistrator,
ObservationRegistry observationRegistry, ApplicationEventPublisher applicationEventPublisher) {
super(NAME, pipelineName, executor, adapter, observationRegistry, applicationEventPublisher);
this.defaultContentType = defaultContentType;
SimpleJmsListenerEndpoint endpoint = listenerEndpoint(jmsConfig.queue());
public LdioAmqpIn(ComponentExecutor executor, LdiAdapter adapter, LdioObserver ldioObserver,
LdioAmqpInRegistrator jmsInRegistrator, LdioAmpqInProperties properties,
ApplicationEventPublisher applicationEventPublisher) {
super(executor, adapter, ldioObserver, applicationEventPublisher);
this.properties = properties;
SimpleJmsListenerEndpoint endpoint = listenerEndpoint(properties.jmsConfig().queue());
ldioAmqpInRegistrator = jmsInRegistrator;
listenerId = endpoint.getId();
jmsInRegistrator.registerListener(jmsConfig, endpoint);
jmsInRegistrator.registerListener(properties.jmsConfig(), endpoint);
this.start();
}

Expand All @@ -53,7 +52,7 @@ public void onMessage(Message message) {
final LdiAdapter.Content content;
try {
String contentTypeProperty = message.getStringProperty(CONTENT_TYPE_HEADER);
String contentType = contentTypeProperty != null ? contentTypeProperty : defaultContentType;
String contentType = contentTypeProperty != null ? contentTypeProperty : properties.defaultContentType();
content = LdiAdapter.Content.of(message.getBody(String.class), contentType);
} catch (JMSException e) {
throw new InvalidAmqpMessageException(e);
Expand All @@ -64,7 +63,7 @@ public void onMessage(Message message) {

private SimpleJmsListenerEndpoint listenerEndpoint(String queue) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(pipelineName);
endpoint.setId(properties.pipelineName());
endpoint.setDestination(queue);
endpoint.setMessageListener(this);
return endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import be.vlaanderen.informatievlaanderen.ldes.ldio.LdioAmqpIn;
import be.vlaanderen.informatievlaanderen.ldes.ldio.configurator.LdioInputConfigurator;
import be.vlaanderen.informatievlaanderen.ldes.ldio.types.LdioInput;
import be.vlaanderen.informatievlaanderen.ldes.ldio.types.LdioObserver;
import be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.ComponentProperties;
import be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects.LdioAmpqInProperties;
import io.micrometer.observation.ObservationRegistry;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFLanguages;
Expand Down Expand Up @@ -38,14 +40,13 @@ public LdioJmsInConfigurator(LdioAmqpInRegistrator ldioAmqpInRegistrator, Observ

@Override
public LdioInput configure(LdiAdapter adapter, ComponentExecutor executor, ApplicationEventPublisher applicationEventPublisher, ComponentProperties config) {
String pipelineName = config.getPipelineName();

String remoteUrl = new RemoteUrlExtractor(config).getRemoteUrl();
JmsConfig jmsConfig = new JmsConfig(config.getProperty(USERNAME), config.getProperty(PASSWORD),
final String pipelineName = config.getPipelineName();
final String remoteUrl = new RemoteUrlExtractor(config).getRemoteUrl();
final JmsConfig jmsConfig = new JmsConfig(config.getProperty(USERNAME), config.getProperty(PASSWORD),
remoteUrl, config.getProperty(QUEUE));

return new LdioAmqpIn(pipelineName, executor, adapter, getContentType(config), jmsConfig, ldioAmqpInRegistrator,
observationRegistry, applicationEventPublisher);
final LdioAmpqInProperties properties = new LdioAmpqInProperties(pipelineName, getContentType(config), jmsConfig);
final LdioObserver ldioObserver = LdioObserver.register(LdioAmqpIn.NAME, pipelineName, observationRegistry);
return new LdioAmqpIn(executor, adapter, ldioObserver, ldioAmqpInRegistrator, properties, applicationEventPublisher);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package be.vlaanderen.informatievlaanderen.ldes.ldio.valueobjects;

import be.vlaanderen.informatievlaanderen.ldes.ldio.config.JmsConfig;

public record LdioAmpqInProperties(String pipelineName, String defaultContentType, JmsConfig jmsConfig) {
}
Loading

0 comments on commit 93786c6

Please sign in to comment.