diff --git a/pom.xml b/pom.xml
index 524857a..ea1f061 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
org.springframework.boot
spring-boot-starter-amqp
+ 3.0.8
org.testcontainers
diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/CommandExecutorImpl.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/CommandExecutorImpl.java
index 30a1824..dded056 100644
--- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/CommandExecutorImpl.java
+++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/CommandExecutorImpl.java
@@ -1,5 +1,6 @@
package edu.stanford.protege.webprotege.ipc.impl;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
@@ -9,11 +10,13 @@
import edu.stanford.protege.webprotege.ipc.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -32,12 +35,9 @@ public class CommandExecutorImpl, R extends Response> imple
private final Class responseClass;
- @Autowired
private ObjectMapper objectMapper;
- @Autowired
- @Lazy
- private RabbitTemplate rabbitTemplate;
+ private AsyncRabbitTemplate asyncRabbitTemplate;
public CommandExecutorImpl(Class responseClass) {
this.responseClass = responseClass;
@@ -51,25 +51,46 @@ public CompletableFuture execute(Q request, ExecutionContext executionContext
rabbitRequest.getMessageProperties().getHeaders().put(Headers.ACCESS_TOKEN, executionContext.jwt());
rabbitRequest.getMessageProperties().getHeaders().put(Headers.USER_ID, executionContext.userId());
rabbitRequest.getMessageProperties().getHeaders().put(Headers.METHOD, request.getChannel());
- org.springframework.amqp.core.Message rabbitResponse = rabbitTemplate.sendAndReceive(request.getChannel(), rabbitRequest);
- CompletableFuture replyHandler = new CompletableFuture<>();
-
- assert rabbitResponse != null;
- var error = (String) rabbitResponse.getMessageProperties().getHeaders().get(Headers.ERROR);
- if (error != null) {
- var executionException = objectMapper.readValue(error, CommandExecutionException.class);
- replyHandler.completeExceptionally(executionException);
- }
- else {
- var response = objectMapper.readValue(rabbitResponse.getBody(), responseClass);
- replyHandler.complete(response);
- }
- return replyHandler;
+ return asyncRabbitTemplate.sendAndReceive(request.getChannel(), rabbitRequest).thenApply(this::handleResponse);
} catch (Exception e) {
logger.error("Error ", e);
throw new RuntimeException(e);
}
}
+
+ private R handleResponse(Message rabbitResponse) {
+ var exception = (String) rabbitResponse.getMessageProperties().getHeaders().get(Headers.ERROR);
+
+ if(exception != null) {
+ try {
+ logger.error("Found error on response " + exception);
+ throw objectMapper.readValue(exception, CommandExecutionException.class);
+ } catch (JsonProcessingException e) {
+ logger.error("Error ", e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ try {
+ return objectMapper.readValue(rabbitResponse.getBody(), responseClass);
+ } catch (IOException e) {
+ logger.error("Error ", e);
+
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Autowired
+ @Qualifier("asyncRabbitTemplate")
+ @Lazy
+ public void setAsyncRabbitTemplate(AsyncRabbitTemplate asyncRabbitTemplate) {
+ this.asyncRabbitTemplate = asyncRabbitTemplate;
+ }
+
+ @Autowired
+ public void setObjectMapper(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
}
diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java
index 7b64c5c..e94ffa9 100644
--- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java
+++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMQEventDispatcher.java
@@ -11,6 +11,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import java.util.Optional;
@@ -33,7 +34,7 @@ public class RabbitMQEventDispatcher implements EventDispatcher {
public RabbitMQEventDispatcher(ObjectMapper objectMapper,
- RabbitTemplate eventRabbitTemplate) {
+ @Qualifier("eventRabbitTemplate") RabbitTemplate eventRabbitTemplate) {
this.objectMapper = objectMapper;
this.eventRabbitTemplate = eventRabbitTemplate;
}
diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqCommandHandlerWrapper.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqCommandHandlerWrapper.java
index f5e1712..11a40a0 100644
--- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqCommandHandlerWrapper.java
+++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqCommandHandlerWrapper.java
@@ -15,6 +15,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.http.HttpStatus;
@@ -33,14 +35,15 @@ public class RabbitMqCommandHandlerWrapper, R extends Respo
private final List> handlers;
+ private final AsyncRabbitTemplate asyncRabbitTemplate;
private final ObjectMapper objectMapper;
+
private final CommandExecutor authorizationStatusExecutor;
- public RabbitMqCommandHandlerWrapper(List> commandHandlers,
- ObjectMapper objectMapper,
- CommandExecutor authorizationStatusExecutor) {
- this.handlers = commandHandlers;
+ public RabbitMqCommandHandlerWrapper(List> handlers, AsyncRabbitTemplate asyncRabbitTemplate, ObjectMapper objectMapper, CommandExecutor authorizationStatusExecutor) {
+ this.handlers = handlers;
+ this.asyncRabbitTemplate = asyncRabbitTemplate;
this.objectMapper = objectMapper;
this.authorizationStatusExecutor = authorizationStatusExecutor;
}
@@ -87,6 +90,7 @@ public void onMessage(Message message, Channel channel) throws Exception {
}
CommandHandler handler = extractHandler(messageType);
+ logger.info("Dispatch handling to {}", handler.getClass());
parseAndHandleRequest(handler, message, channel, new UserId(userId), accessToken);
}
@@ -120,9 +124,11 @@ private void parseAndHandleRequest(CommandHandler handler, Message message,
}
private CommandHandler extends Request, ? extends Response> extractHandler(String messageType){
- return this.handlers.stream().filter(handler -> handler.getChannelName().equalsIgnoreCase(messageType))
+ return this.handlers.stream().filter(handler -> {
+ return handler.getChannelName().equalsIgnoreCase(messageType);
+ } )
.findFirst()
- .orElseThrow(() -> new RuntimeException("Invalid message type" + messageType));
+ .orElseThrow(() -> new RuntimeException("Invalid message type " + messageType));
}
private void authorizeAndReplyToRequest(CommandHandler handler,
@@ -215,13 +221,13 @@ private void replyWithErrorResponse(Message message, Channel channel, UserId use
private void replyWithSuccessResponse(Channel channel, Message message, UserId userId, R response) {
try {
-
var value = objectMapper.writeValueAsBytes(response);
- AMQP.BasicProperties replyProps = new AMQP.BasicProperties
- .Builder()
- .correlationId(message.getMessageProperties().getCorrelationId())
- .build();
- channel.basicPublish(COMMANDS_EXCHANGE, message.getMessageProperties().getReplyTo(), replyProps, value);
+
+ MessageBuilder messageBuilder = MessageBuilder.withBody(value);
+
+ Message replyMessage = messageBuilder.copyProperties(message.getMessageProperties()).build();
+
+ asyncRabbitTemplate.sendAndReceive(message.getMessageProperties().getReplyTo(), replyMessage);
} catch (JsonProcessingException e) {
logger.error("Am erroare ", e);
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java
index dc3f668..3f21ec0 100644
--- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java
+++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java
@@ -9,22 +9,27 @@
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
+import org.springframework.context.ApplicationContext;
import edu.stanford.protege.webprotege.ipc.CommandHandler;
import edu.stanford.protege.webprotege.ipc.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Lazy;
+import javax.annotation.PostConstruct;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
@@ -34,11 +39,29 @@ public class RabbitMqConfiguration {
private final static Logger logger = LoggerFactory.getLogger(RabbitMqConfiguration.class);
+ private final static String ipAddress;
+
+ static {
+ try {
+ ipAddress = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Value("${webprotege.rabbitmq.responsequeue}")
public String COMMANDS_RESPONSE_QUEUE;
+
+ private String getCommandResponseQueue() {
+ return COMMANDS_RESPONSE_QUEUE + ipAddress;
+ }
@Value("${webprotege.rabbitmq.requestqueue}")
public String COMMANDS_QUEUE;
+ private String getCommandQueue(){
+ return COMMANDS_QUEUE ;
+ }
+
public static final String COMMANDS_EXCHANGE = "webprotege-exchange";
public static final String EVENT_EXCHANGE = "webprotege-event-exchange";
@@ -49,26 +72,30 @@ public class RabbitMqConfiguration {
private ConnectionFactory connectionFactory;
@Autowired(required = false)
- private List> commandHandlers = new ArrayList<>();
+ private List> eventHandlers = new ArrayList<>();
+
@Autowired(required = false)
- private List> eventHandlers = new ArrayList<>();
+ private List> handlers;
+
+
+ @Autowired
+ private ApplicationContext applicationContext;
@Autowired
private ObjectMapper objectMapper;
@Autowired
- @Lazy
- CommandExecutor authorizationStatusExecutor;
+ private CommandExecutor authorizationStatusExecutor;
@Bean
Queue msgQueue() {
- return new Queue(COMMANDS_QUEUE, true);
+ return new Queue(getCommandQueue(), true);
}
@Bean
Queue replyQueue() {
- return new Queue(COMMANDS_RESPONSE_QUEUE, true);
+ return new Queue(getCommandResponseQueue(), true);
}
@Bean
@@ -77,8 +104,8 @@ public Queue eventsQueue() {
}
@Bean
- TopicExchange exchange() {
- return new TopicExchange(COMMANDS_EXCHANGE, true, false);
+ DirectExchange exchange() {
+ return new DirectExchange(COMMANDS_EXCHANGE, true, false);
}
@Bean
@@ -86,19 +113,20 @@ FanoutExchange eventExchange() {
return new FanoutExchange(EVENT_EXCHANGE, true, false);
}
-
-
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setReplyAddress(COMMANDS_RESPONSE_QUEUE);
- rabbitTemplate.setReplyTimeout(60000);
- rabbitTemplate.setExchange(COMMANDS_EXCHANGE);
- rabbitTemplate.setUseDirectReplyToContainer(false);
- return rabbitTemplate;
+ @Bean(name = "rabbitTemplate")
+ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+ rabbitTemplate.setReplyTimeout(60000);
+ rabbitTemplate.setExchange(COMMANDS_EXCHANGE);
+ return rabbitTemplate;
+ }
+
+ @Bean(name = "asyncRabbitTemplate")
+ public AsyncRabbitTemplate asyncRabbitTemplate(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyListenerContainer) {
+ return new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue());
}
- @Bean
+ @Bean(name = "eventRabbitTemplate")
public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
@@ -108,7 +136,7 @@ public RabbitTemplate eventRabbitTemplate(ConnectionFactory connectionFactory) {
}
@Bean
- public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory, Queue eventsQueue, RabbitTemplate eventRabbitTemplate) {
+ public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory connectionFactory, Queue eventsQueue, @Qualifier("eventRabbitTemplate") RabbitTemplate eventRabbitTemplate) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(EVENT_QUEUE);
@@ -117,50 +145,25 @@ public SimpleMessageListenerContainer eventsListenerContainer(ConnectionFactory
}
@Bean
- public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory, Queue replyQueue, RabbitTemplate rabbitTemplate) {
+ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory, Queue replyQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(replyQueue);
- container.setMessageListener(rabbitTemplate);
return container;
}
@Bean
- public SimpleMessageListenerContainer messageListenerContainers(ConnectionFactory connectionFactory) {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setQueueNames(COMMANDS_QUEUE);
- container.setConnectionFactory(connectionFactory);
- container.setMessageListener(new RabbitMqCommandHandlerWrapper<>(commandHandlers, objectMapper, authorizationStatusExecutor));
- return container;
+ public RabbitMqCommandHandlerWrapper rabbitMqCommandHandlerWrapper(AsyncRabbitTemplate asyncRabbitTemplate){
+ return new RabbitMqCommandHandlerWrapper<>(handlers, asyncRabbitTemplate, objectMapper, authorizationStatusExecutor);
}
@Bean
- public List bindings(TopicExchange topicExchange, Queue msgQueue, Queue replyQueue) {
-
- var response = new ArrayList();
- try (Connection connection = connectionFactory.createConnection();
- Channel channel = connection.createChannel(true)) {
- channel.exchangeDeclare(COMMANDS_EXCHANGE, "topic", true);
- channel.queueDeclare(COMMANDS_QUEUE, true, false, false, null);
- channel.queueDeclare(COMMANDS_RESPONSE_QUEUE, true, false, false, null);
- channel.basicQos(1);
-
- for (CommandHandler handler : commandHandlers) {
- logger.info("Declaring binding queue {} to exchange {} with key {}", COMMANDS_QUEUE, COMMANDS_EXCHANGE, handler.getChannelName());
- channel.queueBind(COMMANDS_QUEUE, COMMANDS_EXCHANGE, handler.getChannelName());
- response.add(BindingBuilder.bind(msgQueue).to(topicExchange).with(handler.getChannelName()));
- }
- channel.queueBind(COMMANDS_RESPONSE_QUEUE, COMMANDS_EXCHANGE, COMMANDS_RESPONSE_QUEUE);
-
- response.add(BindingBuilder.bind(replyQueue).to(topicExchange).with(replyQueue.getName()));
- channel.close();
- connection.close();
- return response;
-
- } catch (Exception e) {
- logger.error("Error initialize bindings", e);
- }
- return response;
+ public SimpleMessageListenerContainer messageListenerContainers(AsyncRabbitTemplate asyncRabbitTemplate) {
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
+ container.setQueueNames(getCommandQueue());
+ container.setConnectionFactory(connectionFactory);
+ container.setMessageListener(rabbitMqCommandHandlerWrapper(asyncRabbitTemplate));
+ return container;
}
@Bean
@@ -184,5 +187,24 @@ public List eventsBindings(FanoutExchange fanoutExchange, Queue eventsQ
return response;
}
+ @PostConstruct
+ public void createBindings() {
+ try (Connection connection = connectionFactory.createConnection();
+ Channel channel = connection.createChannel(true)) {
+ channel.exchangeDeclare(COMMANDS_EXCHANGE, "direct", true);
+ channel.queueDeclare(getCommandQueue(), true, false, false, null);
+ channel.queueDeclare(getCommandResponseQueue(), true, false, false, null);
+ channel.basicQos(1);
+
+ for (CommandHandler handler : handlers) {
+ logger.info("Declaring binding queue {} to exchange {} with key {}", getCommandQueue(), COMMANDS_EXCHANGE, handler.getChannelName());
+ channel.queueBind(getCommandQueue(), COMMANDS_EXCHANGE, handler.getChannelName());
+ }
+ channel.queueBind(getCommandResponseQueue(), COMMANDS_EXCHANGE, getCommandResponseQueue());
+
+ } catch (Exception e) {
+ logger.error("Error initialize bindings", e);
+ }
+ }
}