From b087b173e89b8d1f177e46b56b22259557fac0a6 Mon Sep 17 00:00:00 2001 From: silag Date: Tue, 12 Mar 2024 10:02:59 +0200 Subject: [PATCH] changed to topic exchange --- .../webprotege/ipc/ExecutionContext.java | 10 +++++++- .../ipc/impl/RabbitMqConfiguration.java | 23 ++++++++++--------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/ExecutionContext.java b/src/main/java/edu/stanford/protege/webprotege/ipc/ExecutionContext.java index 39e1995..ad77672 100644 --- a/src/main/java/edu/stanford/protege/webprotege/ipc/ExecutionContext.java +++ b/src/main/java/edu/stanford/protege/webprotege/ipc/ExecutionContext.java @@ -2,14 +2,22 @@ import edu.stanford.protege.webprotege.common.UserId; +import javax.annotation.Nonnull; +import java.util.Objects; + /** * Matthew Horridge * Stanford Center for Biomedical Informatics Research * 2021-08-19 */ -public record ExecutionContext(UserId userId, String jwt) { +public record ExecutionContext(@Nonnull UserId userId, @Nonnull String jwt) { public ExecutionContext() { this(UserId.getGuest(), ""); } + + public ExecutionContext { + Objects.requireNonNull(userId, "userId cannot be null"); + Objects.requireNonNull(jwt, "jwt cannot be null"); + } } diff --git a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java index 8cb16f3..dc3f668 100644 --- a/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java +++ b/src/main/java/edu/stanford/protege/webprotege/ipc/impl/RabbitMqConfiguration.java @@ -77,8 +77,8 @@ public Queue eventsQueue() { } @Bean - DirectExchange exchange() { - return new DirectExchange(COMMANDS_EXCHANGE, true, false); + TopicExchange exchange() { + return new TopicExchange(COMMANDS_EXCHANGE, true, false); } @Bean @@ -135,31 +135,32 @@ public SimpleMessageListenerContainer messageListenerContainers(ConnectionFactor } @Bean - public List bindings(DirectExchange directExchange, Queue msgQueue, Queue replyQueue) { + public List bindings(TopicExchange topicExchange, Queue msgQueue, Queue replyQueue) { + + var response = new ArrayList(); try (Connection connection = connectionFactory.createConnection(); Channel channel = connection.createChannel(true)) { - channel.exchangeDeclare(COMMANDS_EXCHANGE, "direct", true); + channel.exchangeDeclare(COMMANDS_EXCHANGE, "topic", true); channel.queueDeclare(COMMANDS_QUEUE, true, false, false, null); channel.queueDeclare(COMMANDS_RESPONSE_QUEUE, true, false, false, null); - - var response = new ArrayList(); + channel.basicQos(1); for (CommandHandler handler : commandHandlers) { logger.info("Declaring binding queue {} to exchange {} with key {}", COMMANDS_QUEUE, COMMANDS_EXCHANGE, handler.getChannelName()); channel.queueBind(COMMANDS_QUEUE, COMMANDS_EXCHANGE, handler.getChannelName()); - response.add(BindingBuilder.bind(msgQueue).to(directExchange).with(handler.getChannelName())); + response.add(BindingBuilder.bind(msgQueue).to(topicExchange).with(handler.getChannelName())); } channel.queueBind(COMMANDS_RESPONSE_QUEUE, COMMANDS_EXCHANGE, COMMANDS_RESPONSE_QUEUE); - response.add(BindingBuilder.bind(replyQueue).to(directExchange).with(replyQueue.getName())); + response.add(BindingBuilder.bind(replyQueue).to(topicExchange).with(replyQueue.getName())); channel.close(); connection.close(); return response; } catch (Exception e) { - logger.error("Error ", e); - throw new RuntimeException(e); + logger.error("Error initialize bindings", e); } + return response; } @Bean @@ -176,7 +177,7 @@ public List eventsBindings(FanoutExchange fanoutExchange, Queue eventsQ response.add(BindingBuilder.bind(eventsQueue).to(fanoutExchange)); } } catch (IOException | TimeoutException e) { - throw new RuntimeException(e); + logger.error("Error initialize bindings", e); }