Skip to content

Commit

Permalink
changed to async communication
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsilaghi committed Mar 15, 2024
1 parent b087b17 commit 35d4f77
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 88 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.stanford.protege.webprotege.ipc.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
Expand All @@ -9,11 +10,13 @@
import edu.stanford.protege.webprotege.ipc.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;


Expand All @@ -32,12 +35,9 @@ public class CommandExecutorImpl<Q extends Request<R>, R extends Response> imple
private final Class<R> responseClass;


@Autowired
private ObjectMapper objectMapper;

@Autowired
@Lazy
private RabbitTemplate rabbitTemplate;
private AsyncRabbitTemplate asyncRabbitTemplate;

public CommandExecutorImpl(Class<R> responseClass) {
this.responseClass = responseClass;
Expand All @@ -51,25 +51,46 @@ public CompletableFuture<R> execute(Q request, ExecutionContext executionContext
rabbitRequest.getMessageProperties().getHeaders().put(Headers.ACCESS_TOKEN, executionContext.jwt());
rabbitRequest.getMessageProperties().getHeaders().put(Headers.USER_ID, executionContext.userId());
rabbitRequest.getMessageProperties().getHeaders().put(Headers.METHOD, request.getChannel());
org.springframework.amqp.core.Message rabbitResponse = rabbitTemplate.sendAndReceive(request.getChannel(), rabbitRequest);

CompletableFuture<R> replyHandler = new CompletableFuture<>();

assert rabbitResponse != null;
var error = (String) rabbitResponse.getMessageProperties().getHeaders().get(Headers.ERROR);
if (error != null) {
var executionException = objectMapper.readValue(error, CommandExecutionException.class);
replyHandler.completeExceptionally(executionException);
}
else {
var response = objectMapper.readValue(rabbitResponse.getBody(), responseClass);
replyHandler.complete(response);
}
return replyHandler;
return asyncRabbitTemplate.sendAndReceive(request.getChannel(), rabbitRequest).thenApply(this::handleResponse);
} catch (Exception e) {
logger.error("Error ", e);
throw new RuntimeException(e);
}
}


private R handleResponse(Message rabbitResponse) {
var exception = (String) rabbitResponse.getMessageProperties().getHeaders().get(Headers.ERROR);

if(exception != null) {
try {
logger.error("Found error on response " + exception);
throw objectMapper.readValue(exception, CommandExecutionException.class);
} catch (JsonProcessingException e) {
logger.error("Error ", e);
throw new RuntimeException(e);
}
} else {
try {
return objectMapper.readValue(rabbitResponse.getBody(), responseClass);
} catch (IOException e) {
logger.error("Error ", e);

throw new RuntimeException(e);
}
}
}

@Autowired
@Qualifier("asyncRabbitTemplate")
@Lazy
public void setAsyncRabbitTemplate(AsyncRabbitTemplate asyncRabbitTemplate) {
this.asyncRabbitTemplate = asyncRabbitTemplate;
}

@Autowired
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;

import java.util.Optional;
Expand All @@ -33,7 +34,7 @@ public class RabbitMQEventDispatcher implements EventDispatcher {


public RabbitMQEventDispatcher(ObjectMapper objectMapper,
RabbitTemplate eventRabbitTemplate) {
@Qualifier("eventRabbitTemplate") RabbitTemplate eventRabbitTemplate) {
this.objectMapper = objectMapper;
this.eventRabbitTemplate = eventRabbitTemplate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.http.HttpStatus;

Expand All @@ -33,14 +35,15 @@ public class RabbitMqCommandHandlerWrapper<Q extends Request<R>, R extends Respo

private final List<CommandHandler<? extends Request, ? extends Response>> handlers;

private final AsyncRabbitTemplate asyncRabbitTemplate;

private final ObjectMapper objectMapper;

private final CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor;

public RabbitMqCommandHandlerWrapper(List<CommandHandler<? extends Request, ? extends Response>> commandHandlers,
ObjectMapper objectMapper,
CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor) {
this.handlers = commandHandlers;
public RabbitMqCommandHandlerWrapper(List<CommandHandler<? extends Request, ? extends Response>> handlers, AsyncRabbitTemplate asyncRabbitTemplate, ObjectMapper objectMapper, CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor) {
this.handlers = handlers;
this.asyncRabbitTemplate = asyncRabbitTemplate;
this.objectMapper = objectMapper;
this.authorizationStatusExecutor = authorizationStatusExecutor;
}
Expand Down Expand Up @@ -87,6 +90,7 @@ public void onMessage(Message message, Channel channel) throws Exception {
}

CommandHandler handler = extractHandler(messageType);
logger.info("Dispatch handling to {}", handler.getClass());
parseAndHandleRequest(handler, message, channel, new UserId(userId), accessToken);
}

Expand Down Expand Up @@ -120,9 +124,11 @@ private void parseAndHandleRequest(CommandHandler<Q,R> handler, Message message,
}

private CommandHandler<? extends Request, ? extends Response> extractHandler(String messageType){
return this.handlers.stream().filter(handler -> handler.getChannelName().equalsIgnoreCase(messageType))
return this.handlers.stream().filter(handler -> {
return handler.getChannelName().equalsIgnoreCase(messageType);
} )
.findFirst()
.orElseThrow(() -> new RuntimeException("Invalid message type" + messageType));
.orElseThrow(() -> new RuntimeException("Invalid message type " + messageType));
}

private void authorizeAndReplyToRequest(CommandHandler<Q,R> handler,
Expand Down Expand Up @@ -215,13 +221,13 @@ private void replyWithErrorResponse(Message message, Channel channel, UserId use

private void replyWithSuccessResponse(Channel channel, Message message, UserId userId, R response) {
try {

var value = objectMapper.writeValueAsBytes(response);
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(message.getMessageProperties().getCorrelationId())
.build();
channel.basicPublish(COMMANDS_EXCHANGE, message.getMessageProperties().getReplyTo(), replyProps, value);

MessageBuilder messageBuilder = MessageBuilder.withBody(value);

Message replyMessage = messageBuilder.copyProperties(message.getMessageProperties()).build();

asyncRabbitTemplate.sendAndReceive(message.getMessageProperties().getReplyTo(), replyMessage);
} catch (JsonProcessingException e) {
logger.error("Am erroare ", e);
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
Expand Down
Loading

0 comments on commit 35d4f77

Please sign in to comment.