Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix event sync #3

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-ipc</artifactId>
<version>0.10.3-SNAPSHOT</version>
<version>1.0.1</version>
<name>webprotege-ipc</name>
<description>Inter Process Communication framework</description>
<properties>
Expand Down Expand Up @@ -57,7 +57,7 @@
<dependency>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-common</artifactId>
<version>0.9.5</version>
<version>0.9.6</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import edu.stanford.protege.webprotege.ipc.EventDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand Down Expand Up @@ -49,9 +50,9 @@ public void dispatchEvent(Event event) {
var projectId = ((ProjectEvent) event).projectId().value();
message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId);
}
eventRabbitTemplate.send(message);
logger.info("Sent event message");
} catch (JsonProcessingException e) {
eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message);
logger.info("Sent event message!");
matthewhorridge marked this conversation as resolved.
Show resolved Hide resolved
} catch (JsonProcessingException | AmqpException e) {
logger.info("Could not serialize event: {}", e.getMessage(), e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void onMessage(Message message) {
EventHandler eventHandler = eventHandlers.stream()
.filter(handler -> {
String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL));
return handler.getChannelName().equals(channel);
return channel.contains(handler.getChannelName());
}).findFirst()
.orElse(null);
if(eventHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package edu.stanford.protege.webprotege.ipc.impl;


import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.Event;
import edu.stanford.protege.webprotege.ipc.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

@Configuration
public class RabbitMQEventsConfiguration {
private final static Logger logger = LoggerFactory.getLogger(RabbitMQEventsConfiguration.class);

@Value("${webprotege.rabbitmq.timeout}")
public Long rabbitMqTimeout;

@Value("${webprotege.rabbitmq.eventsqueue:EVENTS_QUEUE}")
public String eventsQueue;


public static final String EVENT_EXCHANGE = "webprotege-event-exchange";


@Autowired(required = false)
private List<EventHandler<? extends Event>> eventHandlers = new ArrayList<>();

@Autowired
private ObjectMapper objectMapper;

@Bean
public Queue eventsQueue() {
return new Queue(eventsQueue, true);
}


@Bean
FanoutExchange eventExchange() {
return new FanoutExchange(EVENT_EXCHANGE, true, false);
}


@Bean(name = "eventRabbitTemplate")
public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
rabbitTemplate.setExchange(EVENT_EXCHANGE);
return rabbitTemplate;
}

@Bean
@ConditionalOnProperty(havingValue = "true", prefix = "webprotege.rabbitmq", name = "event-subscribe")
public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory) {
logger.info("[RabbitMQEventConfiguration] Listening to event queue {}", eventsQueue);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(eventsQueue);
container.setMessageListener(new RabbitMQEventHandlerWrapper(eventHandlers, objectMapper));
return container;
}

@Bean
@ConditionalOnProperty(havingValue = "true", prefix = "webprotege.rabbitmq", name = "event-subscribe")
public Binding binding(Queue eventsQueue, FanoutExchange fanoutExchange) {
logger.info("[RabbitMQEventConfiguration] Binding to event queue {}", eventsQueue);

return BindingBuilder.bind(eventsQueue).to(fanoutExchange);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import com.rabbitmq.client.Channel;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse;
import edu.stanford.protege.webprotege.common.Event;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import org.springframework.context.ApplicationContext;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import edu.stanford.protege.webprotege.ipc.CommandHandler;
import edu.stanford.protege.webprotege.ipc.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
Expand All @@ -33,6 +31,7 @@
import java.util.concurrent.TimeoutException;

@Configuration
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public class RabbitMqConfiguration {

private final static Logger logger = LoggerFactory.getLogger(RabbitMqConfiguration.class);
Expand All @@ -50,56 +49,41 @@ public class RabbitMqConfiguration {

public static final String COMMANDS_EXCHANGE = "webprotege-exchange";

public static final String EVENT_EXCHANGE = "webprotege-event-exchange";

public static final String EVENT_QUEUE = "webprotege-event-queue";

@Autowired
private ConnectionFactory connectionFactory;

@Autowired(required = false)
private List<EventHandler<? extends Event>> eventHandlers = new ArrayList<>();

@Autowired
private ObjectMapper objectMapper;

@Autowired(required = false)
private List<CommandHandler<? extends Request, ? extends Response>> handlers = new ArrayList<>();


@Autowired
private ApplicationContext applicationContext;

@Autowired
private ObjectMapper objectMapper;

@Autowired
private CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor;

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
Queue msgQueue() {
return new Queue(getCommandQueue(), true);
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
Queue replyQueue() {
return new Queue(getCommandResponseQueue(), true);
}

@Bean
public Queue eventsQueue() {
return new Queue(EVENT_QUEUE, true);
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
DirectExchange exchange() {
return new DirectExchange(COMMANDS_EXCHANGE, true, false);
}

@Bean
FanoutExchange eventExchange() {
return new FanoutExchange(EVENT_EXCHANGE, true, false);
}


@Bean(name = "rabbitTemplate")
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
Expand All @@ -108,29 +92,15 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
}

@Bean(name = "asyncRabbitTemplate")
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public AsyncRabbitTemplate asyncRabbitTemplate(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyListenerContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue());
}

@Bean(name = "eventRabbitTemplate")
public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
rabbitTemplate.setExchange(EVENT_EXCHANGE);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory, Queue eventsQueue, @Qualifier("eventRabbitTemplate") RabbitTemplate eventRabbitTemplate) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(EVENT_QUEUE);
container.setMessageListener(new RabbitMQEventHandlerWrapper(eventHandlers, objectMapper));
return container;
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory, Queue replyQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
Expand All @@ -139,11 +109,13 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public RabbitMqCommandHandlerWrapper rabbitMqCommandHandlerWrapper(){
return new RabbitMqCommandHandlerWrapper<>(handlers, objectMapper, authorizationStatusExecutor);
}

@Bean
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public SimpleMessageListenerContainer messageListenerContainers() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setQueueNames(getCommandQueue());
Expand All @@ -152,28 +124,8 @@ public SimpleMessageListenerContainer messageListenerContainers() {
return container;
}

@Bean
public List<Binding> eventsBindings(FanoutExchange fanoutExchange, Queue eventsQueue) {
var response = new ArrayList<Binding>();

try (Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true)) {
channel.exchangeDeclare(EVENT_EXCHANGE, "fanout", true);
channel.queueDeclare(EVENT_QUEUE, true, false, false, null);

for(EventHandler eventHandler : eventHandlers) {
channel.queueBind(eventsQueue.getName(), fanoutExchange.getName(), eventHandler.getChannelName());
response.add(BindingBuilder.bind(eventsQueue).to(fanoutExchange));
}
} catch (IOException | TimeoutException e) {
logger.error("Error initialize bindings", e);
}


return response;
}

@PostConstruct
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public void createBindings() {
try (Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true)) {
Expand All @@ -190,6 +142,7 @@ public void createBindings() {

} catch (IOException |TimeoutException e) {
logger.error("Error initialize bindings", e);
throw new RuntimeException("Error initialize bindings", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.boot.test.json.JacksonTester;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;

import java.io.IOException;

Expand All @@ -20,6 +21,7 @@
@SpringBootTest
@AutoConfigureJsonTesters
@ContextConfiguration(classes = WebProtegeIpcApplication.class)
@TestPropertySource(properties = "webprotege.rabbitmq.commands-subscribe=false")
public class CommandExecutionException_Tests extends IntegrationTestsExtension {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* 2022-02-09
*/
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class CommandExecutor_CommandHandler_ExceptionThrowing_TestsCase extends IntegrationTestsExtension {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* 2021-08-03
*/
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class CommandExecutor_CommandHandler_Test extends IntegrationTestsExtension {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -25,6 +26,7 @@
* 2022-02-08
*/
@SpringBootTest(classes = WebProtegeIpcApplication.class)
@TestPropertySource(properties = "webprotege.rabbitmq.event-subscribe=true")
public class EventHandler_TestCase extends IntegrationTestsExtension {
public static CountDownLatch countDownLatch;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package edu.stanford.protege.webprotege.ipc;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.RabbitMQContainer;
Expand All @@ -21,6 +24,7 @@
@SpringBootTest(properties = {"spring.mongodb.embedded.version=5.0.6"})
@AutoConfigureWebTestClient
@Testcontainers
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class IntegrationTestsExtension {

private static Logger logger = LoggerFactory.getLogger(IntegrationTestsExtension.class);
Expand All @@ -36,4 +40,14 @@ static void configure(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.host", rabbitContainer::getHost);
registry.add("spring.rabbitmq.port", rabbitContainer::getAmqpPort);
}

@BeforeAll
public static void containerSetUp(){
rabbitContainer.start();
}

@AfterAll
public static void containerTearDown(){
rabbitContainer.stop();
}
}
Loading