Skip to content

Commit

Permalink
changed to topic exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsilaghi committed Mar 12, 2024
1 parent adf8fc8 commit b087b17
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

import edu.stanford.protege.webprotege.common.UserId;

import javax.annotation.Nonnull;
import java.util.Objects;

/**
* Matthew Horridge
* Stanford Center for Biomedical Informatics Research
* 2021-08-19
*/
public record ExecutionContext(UserId userId, String jwt) {
public record ExecutionContext(@Nonnull UserId userId, @Nonnull String jwt) {

public ExecutionContext() {
this(UserId.getGuest(), "");
}

public ExecutionContext {
Objects.requireNonNull(userId, "userId cannot be null");
Objects.requireNonNull(jwt, "jwt cannot be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public Queue eventsQueue() {
}

@Bean
DirectExchange exchange() {
return new DirectExchange(COMMANDS_EXCHANGE, true, false);
TopicExchange exchange() {
return new TopicExchange(COMMANDS_EXCHANGE, true, false);
}

@Bean
Expand Down Expand Up @@ -135,31 +135,32 @@ public SimpleMessageListenerContainer messageListenerContainers(ConnectionFactor
}

@Bean
public List<Binding> bindings(DirectExchange directExchange, Queue msgQueue, Queue replyQueue) {
public List<Binding> bindings(TopicExchange topicExchange, Queue msgQueue, Queue replyQueue) {

var response = new ArrayList<Binding>();
try (Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true)) {
channel.exchangeDeclare(COMMANDS_EXCHANGE, "direct", true);
channel.exchangeDeclare(COMMANDS_EXCHANGE, "topic", true);
channel.queueDeclare(COMMANDS_QUEUE, true, false, false, null);
channel.queueDeclare(COMMANDS_RESPONSE_QUEUE, true, false, false, null);

var response = new ArrayList<Binding>();
channel.basicQos(1);

for (CommandHandler handler : commandHandlers) {
logger.info("Declaring binding queue {} to exchange {} with key {}", COMMANDS_QUEUE, COMMANDS_EXCHANGE, handler.getChannelName());
channel.queueBind(COMMANDS_QUEUE, COMMANDS_EXCHANGE, handler.getChannelName());
response.add(BindingBuilder.bind(msgQueue).to(directExchange).with(handler.getChannelName()));
response.add(BindingBuilder.bind(msgQueue).to(topicExchange).with(handler.getChannelName()));
}
channel.queueBind(COMMANDS_RESPONSE_QUEUE, COMMANDS_EXCHANGE, COMMANDS_RESPONSE_QUEUE);

response.add(BindingBuilder.bind(replyQueue).to(directExchange).with(replyQueue.getName()));
response.add(BindingBuilder.bind(replyQueue).to(topicExchange).with(replyQueue.getName()));
channel.close();
connection.close();
return response;

} catch (Exception e) {
logger.error("Error ", e);
throw new RuntimeException(e);
logger.error("Error initialize bindings", e);
}
return response;
}

@Bean
Expand All @@ -176,7 +177,7 @@ public List<Binding> eventsBindings(FanoutExchange fanoutExchange, Queue eventsQ
response.add(BindingBuilder.bind(eventsQueue).to(fanoutExchange));
}
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
logger.error("Error initialize bindings", e);
}


Expand Down

0 comments on commit b087b17

Please sign in to comment.