Skip to content

Commit

Permalink
event config is separated from command config.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsilaghi committed Apr 17, 2024
1 parent f4e263e commit aadc73b
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 71 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-ipc</artifactId>
<version>0.10.3-SNAPSHOT</version>
<version>0.10.4</version>
<name>webprotege-ipc</name>
<description>Inter Process Communication framework</description>
<properties>
Expand Down Expand Up @@ -57,7 +57,7 @@
<dependency>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-common</artifactId>
<version>0.9.5</version>
<version>0.9.6-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import edu.stanford.protege.webprotege.ipc.EventDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand Down Expand Up @@ -49,9 +50,9 @@ public void dispatchEvent(Event event) {
var projectId = ((ProjectEvent) event).projectId().value();
message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId);
}
eventRabbitTemplate.send(message);
logger.info("Sent event message");
} catch (JsonProcessingException e) {
eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message);
logger.info("Sent event message!");
} catch (JsonProcessingException | AmqpException e) {
logger.info("Could not serialize event: {}", e.getMessage(), e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void onMessage(Message message) {
EventHandler eventHandler = eventHandlers.stream()
.filter(handler -> {
String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL));
return handler.getChannelName().equals(channel);
return channel.contains(handler.getChannelName());
}).findFirst()
.orElse(null);
if(eventHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package edu.stanford.protege.webprotege.ipc.impl;


import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.Event;
import edu.stanford.protege.webprotege.ipc.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

@Configuration
public class RabbitMQEventsConfiguration {
private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventsConfiguration.class);

@Value("${webprotege.rabbitmq.timeout}")
public Long rabbitMqTimeout;

@Value("${webprotege.rabbitmq.eventsqueue:EVENTS_QUEUE}")
public String eventsQueue;


public static final String EVENT_EXCHANGE = "webprotege-event-exchange";


@Autowired(required = false)
private List<EventHandler<? extends Event>> eventHandlers = new ArrayList<>();

@Autowired
private ObjectMapper objectMapper;

@Bean
public Queue eventsQueue() {
return new Queue(eventsQueue, true);
}


@Bean
FanoutExchange eventExchange() {
return new FanoutExchange(EVENT_EXCHANGE, true, false);
}


@Bean(name = "eventRabbitTemplate")
public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
rabbitTemplate.setExchange(EVENT_EXCHANGE);
return rabbitTemplate;
}

@Bean
@ConditionalOnProperty(havingValue = "true", prefix = "webprotege.rabbitmq", name = "event-subscribe")
public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory) {
logger.info("[RabbitMQEventConfiguration] Listening to event queue {}", eventsQueue);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(eventsQueue);
container.setMessageListener(new RabbitMQEventHandlerWrapper(eventHandlers, objectMapper));
return container;
}

@Bean
@ConditionalOnProperty(havingValue = "true", prefix = "webprotege.rabbitmq", name = "event-subscribe")
public Binding binding(Queue eventsQueue, FanoutExchange fanoutExchange) {
logger.info("[RabbitMQEventConfiguration] Binding to event queue {}", eventsQueue);

return BindingBuilder.bind(eventsQueue).to(fanoutExchange);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import com.rabbitmq.client.Channel;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse;
import edu.stanford.protege.webprotege.common.Event;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import org.springframework.context.ApplicationContext;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import edu.stanford.protege.webprotege.ipc.CommandHandler;
import edu.stanford.protege.webprotege.ipc.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
Expand All @@ -33,6 +31,7 @@
import java.util.concurrent.TimeoutException;

@Configuration
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public class RabbitMqConfiguration {

private final static Logger logger = LoggerFactory.getLogger(RabbitMqConfiguration.class);
Expand All @@ -50,56 +49,41 @@ public class RabbitMqConfiguration {

public static final String COMMANDS_EXCHANGE = "webprotege-exchange";

public static final String EVENT_EXCHANGE = "webprotege-event-exchange";

public static final String EVENT_QUEUE = "webprotege-event-queue";

@Autowired
private ConnectionFactory connectionFactory;

@Autowired(required = false)
private List<EventHandler<? extends Event>> eventHandlers = new ArrayList<>();

@Autowired
private ObjectMapper objectMapper;

@Autowired(required = false)
private List<CommandHandler<? extends Request, ? extends Response>> handlers = new ArrayList<>();


@Autowired
private ApplicationContext applicationContext;

@Autowired
private ObjectMapper objectMapper;

@Autowired
private CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor;

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
Queue msgQueue() {
return new Queue(getCommandQueue(), true);
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
Queue replyQueue() {
return new Queue(getCommandResponseQueue(), true);
}

@Bean
public Queue eventsQueue() {
return new Queue(EVENT_QUEUE, true);
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
DirectExchange exchange() {
return new DirectExchange(COMMANDS_EXCHANGE, true, false);
}

@Bean
FanoutExchange eventExchange() {
return new FanoutExchange(EVENT_EXCHANGE, true, false);
}


@Bean(name = "rabbitTemplate")
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
Expand All @@ -108,29 +92,15 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
}

@Bean(name = "asyncRabbitTemplate")
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public AsyncRabbitTemplate asyncRabbitTemplate(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyListenerContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue());
}

@Bean(name = "eventRabbitTemplate")
public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
rabbitTemplate.setExchange(EVENT_EXCHANGE);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory, Queue eventsQueue, @Qualifier("eventRabbitTemplate") RabbitTemplate eventRabbitTemplate) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(EVENT_QUEUE);
container.setMessageListener(new RabbitMQEventHandlerWrapper(eventHandlers, objectMapper));
return container;
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory, Queue replyQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
Expand All @@ -139,11 +109,13 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public RabbitMqCommandHandlerWrapper rabbitMqCommandHandlerWrapper(){
return new RabbitMqCommandHandlerWrapper<>(handlers, objectMapper, authorizationStatusExecutor);
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public SimpleMessageListenerContainer messageListenerContainers() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setQueueNames(getCommandQueue());
Expand All @@ -152,29 +124,8 @@ public SimpleMessageListenerContainer messageListenerContainers() {
return container;
}

@Bean
public List<Binding> eventsBindings(FanoutExchange fanoutExchange, Queue eventsQueue) {
var response = new ArrayList<Binding>();

try (Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true)) {
channel.exchangeDeclare(EVENT_EXCHANGE, "fanout", true);
channel.queueDeclare(EVENT_QUEUE, true, false, false, null);

for(EventHandler eventHandler : eventHandlers) {
channel.queueBind(eventsQueue.getName(), fanoutExchange.getName(), eventHandler.getChannelName());
response.add(BindingBuilder.bind(eventsQueue).to(fanoutExchange));
}
} catch (IOException | TimeoutException e) {
logger.error("Error initialize bindings", e);
throw new RuntimeException("Error initialize bindings", e);
}


return response;
}

@PostConstruct
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public void createBindings() {
try (Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.boot.test.json.JacksonTester;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;

import java.io.IOException;

Expand All @@ -20,6 +21,7 @@
@SpringBootTest
@AutoConfigureJsonTesters
@ContextConfiguration(classes = WebProtegeIpcApplication.class)
@TestPropertySource(properties = "webprotege.rabbitmq.commands-subscribe=false")
public class CommandExecutionException_Tests extends IntegrationTestsExtension {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* 2022-02-09
*/
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase extends IntegrationTestsExtension {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* 2021-08-03
*/
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class CommandExecutor_CommandHandler_Test extends IntegrationTestsExtension {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -25,6 +26,7 @@
* 2022-02-08
*/
@SpringBootTest(classes = WebProtegeIpcApplication.class)
@TestPropertySource(properties = "webprotege.rabbitmq.event-subscribe=true")
public class EventHandler_TestCase extends IntegrationTestsExtension {
public static CountDownLatch countDownLatch;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package edu.stanford.protege.webprotege.ipc;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.RabbitMQContainer;
Expand All @@ -21,6 +24,7 @@
@SpringBootTest(properties = {"spring.mongodb.embedded.version=5.0.6"})
@AutoConfigureWebTestClient
@Testcontainers
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class IntegrationTestsExtension {

private static Logger logger = LoggerFactory.getLogger(IntegrationTestsExtension.class);
Expand All @@ -36,4 +40,14 @@ static void configure(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.host", rabbitContainer::getHost);
registry.add("spring.rabbitmq.port", rabbitContainer::getAmqpPort);
}

@BeforeAll
public static void containerSetUp(){
rabbitContainer.start();
}

@AfterAll
public static void containerTearDown(){
rabbitContainer.stop();
}
}

0 comments on commit aadc73b

Please sign in to comment.