Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

increase timeout #6

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-ipc</artifactId>
<version>1.0.3</version>
<version>1.0.5</version>
<name>webprotege-ipc</name>
<description>Inter Process Communication framework</description>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private R handleResponse(Message rabbitResponse) {

if(exception != null) {
try {
logger.error("Found error on response " + exception);
logger.error("Found error on response {}. Action : {}" ,exception, rabbitResponse.getMessageProperties().getHeaders().get(Headers.METHOD));
throw objectMapper.readValue(exception, CommandExecutionException.class);
} catch (JsonProcessingException e) {
logger.error("Error ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public void dispatchEvent(Event event) {
message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId);
}
eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message);
logger.info("Sent event message!");
} catch (JsonProcessingException | AmqpException e) {
logger.info("Could not serialize event: {}", e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public RabbitMQEventHandlerWrapper(List<EventHandler<? extends Event>> eventHand

@Override
public void onMessage(Message message) {
logger.info("Handling event with id {}", message.getMessageProperties().getMessageId());
EventHandler eventHandler = eventHandlers.stream()
.filter(handler -> {
String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public RabbitMqCommandHandlerWrapper(List<CommandHandler<? extends Request, ? ex

@Override
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("Received message " + message);

var replyChannel = message.getMessageProperties().getReplyTo();
if (replyChannel == null) {
String errorMessage = Headers.REPLY_CHANNEL + " header is missing. Cannot reply to message.";
Expand Down Expand Up @@ -86,7 +84,6 @@ public void onMessage(Message message, Channel channel) throws Exception {
}

CommandHandler handler = extractHandler(messageType);
logger.info("Dispatch handling to {}", handler.getClass());
parseAndHandleRequest(handler, message, channel, new UserId(userId), accessToken);
}

Expand Down Expand Up @@ -171,10 +168,15 @@ private void authorizeAndReplyToRequest(CommandHandler<Q,R> handler,

private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channel, Message message, UserId userId, Q request, String accessToken) {
var executionContext = new ExecutionContext(userId, accessToken);
long startTime = System.currentTimeMillis();

try {
var response = handler.handleRequest(request, executionContext);
response.subscribe(r -> {
replyWithSuccessResponse(channel, message, userId, r);
long endtime = System.currentTimeMillis();
logger.info("Request executed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}, throwable -> {
if (throwable instanceof CommandExecutionException ex) {
logger.info(
Expand All @@ -183,14 +185,23 @@ private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channe
throwable.getMessage(),
request);
replyWithErrorResponse(message,channel, userId, ex.getStatus());
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");
}
else {
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();

logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
});
} catch (Throwable throwable) {
logger.error("Uncaught exception when handling request", throwable);
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
@Bean(name = "asyncRabbitTemplate")
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public AsyncRabbitTemplate asyncRabbitTemplate(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyListenerContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue());
var asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate,
replyListenerContainer,
getCommandResponseQueue());
asyncRabbitTemplate.setReceiveTimeout(rabbitMqTimeout);
return asyncRabbitTemplate;
}


Expand All @@ -105,6 +109,7 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(replyQueue);
container.setConcurrency("15-20");
return container;
}

Expand All @@ -121,6 +126,7 @@ public SimpleMessageListenerContainer messageListenerContainers() {
container.setQueueNames(getCommandQueue());
container.setConnectionFactory(connectionFactory);
container.setMessageListener(rabbitMqCommandHandlerWrapper());
container.setConcurrency("15-20");
return container;
}

Expand Down
Loading