Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure multiple topics per route #519

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Automatically recover from channel-level exceptions. This involves a breaking change in the constructor of `FlusswerkConsumer`, which now requires a `RabbitClient` instead of a `Channel`.

### Changed
- Outgoing routes may now include a list of topics instead of a single topic. A `Route` can be used to send a message to several topics at once.

## [6.0.1] - 2023-12-06

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class RoutingProperties {
private final List<String> incoming;
private final Map<String, String> exchanges;
private final Map<String, String> deadLetterExchanges;
private final Map<String, String> outgoing;
private final Map<String, List<String>> outgoing;
private final Map<String, FailurePolicy> failurePolicies;

/**
Expand All @@ -39,7 +39,7 @@ public class RoutingProperties {
public RoutingProperties(
@NotBlank String exchange,
List<String> incoming,
Map<String, String> outgoing,
Map<String, List<String>> outgoing,
Map<String, String> exchanges,
Map<String, String> deadLetterExchanges,
Map<String, FailurePolicyProperties> failurePolicies) {
Expand Down Expand Up @@ -76,7 +76,8 @@ public RoutingProperties(
* @param outgoing The routes for outgoing messages.
* @return routing properties that rely on defaults wherever possible
*/
public static RoutingProperties minimal(List<String> incoming, Map<String, String> outgoing) {
public static RoutingProperties minimal(
List<String> incoming, Map<String, List<String>> outgoing) {
return new RoutingProperties(null, incoming, outgoing, null, null, null);
}

Expand All @@ -85,7 +86,7 @@ private void setupExchangeConfigurations(
String defaultDlx,
Map<String, String> specificExchanges,
Map<String, String> specificDeadLetterExchanges) {
Stream.concat(this.incoming.stream(), this.outgoing.values().stream())
Stream.concat(this.incoming.stream(), this.outgoing.values().stream().flatMap(List::stream))
.forEach(
queue -> {
String exchange = specificExchanges.getOrDefault(queue, defaultExchange);
Expand Down Expand Up @@ -142,9 +143,9 @@ public List<String> getIncoming() {
}

/**
* @return The topic to send to per default (optional).
* @return The queues to send to (optional).
*/
public Map<String, String> getOutgoing() {
public Map<String, List<String>> getOutgoing() {
return outgoing;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,11 +45,13 @@ public MessageBroker(RoutingProperties routing, RabbitClient rabbitClient) throw
*/
@Deprecated
void send(Message message) throws IOException {
var topic = routingConfig.getOutgoing().get("default");
if (topic == null) {
List<String> topics = routingConfig.getOutgoing().get("default");
if (topics == null || topics.isEmpty()) {
throw new RuntimeException("Cannot send message, no default queue specified");
}
send(topic, message);
for (String topic : topics) {
send(topic, message);
}
}

/**
Expand All @@ -60,11 +63,13 @@ void send(Message message) throws IOException {
*/
@Deprecated
public void send(Collection<? extends Message> messages) throws IOException {
var topic = routingConfig.getOutgoing().get("default");
if (topic == null) {
throw new RuntimeException("Cannot send messages, no default queue specified");
List<String> topics = routingConfig.getOutgoing().get("default");
if (topics == null || topics.isEmpty()) {
throw new RuntimeException("Cannot send message, no default queue specified");
}
for (String topic : topics) {
send(topic, messages);
}
send(topic, messages);
}

/**
Expand Down Expand Up @@ -188,7 +193,8 @@ private void provideInputQueues() throws IOException {
}

private void provideOutputQueues() throws IOException {
for (String topic : routingConfig.getOutgoing().values()) {
for (String topic :
routingConfig.getOutgoing().values().stream().flatMap(List::stream).toList()) {
bitzl marked this conversation as resolved.
Show resolved Hide resolved
rabbitClient.declareQueue(
topic,
routingConfig.getExchange(topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public class RabbitMQ {

private final Map<String, Queue> queues;
private final Map<String, Topic> routes;
private final Map<String, Route> routes;
private final Map<String, Topic> topics;
private final RabbitClient rabbitClient;
private final MessageBroker messageBroker;
Expand Down Expand Up @@ -47,11 +47,16 @@ public RabbitMQ(
routingProperties
.getOutgoing()
.forEach(
(route, topicName) -> {
addQueue(topicName);
var topic = new Topic(topicName, messageBroker, tracing);
topics.put(topicName, topic);
routes.put(route, topic);
(routeName, topicNames) -> {
Route route = new Route(routeName);
topicNames.forEach(
name -> {
addQueue(name);
var topic = new Topic(name, messageBroker, tracing);
topics.put(name, topic);
route.addTopic(topic);
});
routes.put(routeName, route);
});
}

Expand All @@ -60,12 +65,12 @@ private void addQueue(String name) {
}

/**
* A topic to send messages to.
* A route (collection of topics) to send messages to.
*
* @param name a route as configured in application.yml
* @return the corresponding topic
* @return the corresponding route
*/
public Topic route(String name) {
public Route route(String name) {
return routes.get(name);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

/** Flusswerk-specific abstraction, collection of topics/queues. No equivalent in RabbitMQ. */
public class Route {
private String name;
schmika marked this conversation as resolved.
Show resolved Hide resolved
private List<Topic> topics;
schmika marked this conversation as resolved.
Show resolved Hide resolved

public Route(String name) {
this.name = name;
topics = new ArrayList<>();
}

public Route(String name, List<Topic> topics) {
this.name = name;
this.topics = topics;
}

public void addTopic(Topic topic) {
this.topics.add(topic);
}

/**
* Sends a message to the topics on this route. The message will have a tracing id either based on
* the incoming message or newly generated for applications that do not work on incoming messages.
* In that case, every time you call this method it creates a new tracing path.
*
* @param message The message to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
public void send(Message message) throws IOException {
for (Topic topic : topics) {
topic.send(message);
}
}

/**
* Sends multiple messages to the topics on this route.
*
* @param messages The messages to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
public void send(Collection<Message> messages) throws IOException {
for (Topic topic : topics) {
topic.send(messages);
}
}

/**
* Convenience implementation, mostly for tests.
*
* @param messages The messages to send.
* @throws IOException If communication with RabbitMQ fails or if the message cannot be serialized
* to JSON.
*/
public void send(Message... messages) throws IOException {
send(List.of(messages));
}

/**
* Sends a bunch of bytes to RabbitMQ.
*
* <p><b>Use with caution and only when using {@link Message} is not viable.</b>
*
* @param message The message serialized to bytes
*/
public void sendRaw(byte[] message) {
for (Topic topic : topics) {
topic.sendRaw(message);
}
}

public String getName() {
return this.name;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof Route route) {
return Objects.equals(topics, route.topics);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(topics);
}

@Override
public String toString() {
return "Route{name='" + name + "', topics='" + topics + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void valuesOfRouting() {
assertThat(routing)
.hasFieldOrPropertyWithValue("defaultExchange", "my.exchange")
.hasFieldOrPropertyWithValue("incoming", List.of("first", "second"))
.hasFieldOrPropertyWithValue("outgoing", Map.of("default", "default.queue.to.write.to"));
.hasFieldOrPropertyWithValue(
"outgoing", Map.of("default", List.of("default.queue.to.write.to")));

assertThat(routing.getExchange("second")).isEqualTo("other.exchange");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void shouldReturnSpecificDlx() {
void shouldProvideValidMinimalConfig() {
var routingConfig =
RoutingProperties.minimal(
List.of("some.input.queue"), Map.of("default", "some.output.queue"));
List.of("some.input.queue"), Map.of("default", List.of("some.output.queue")));
assertThat(routingConfig.getExchange("some.input.queue")).isEqualTo(DEFAULT_EXCHANGE);
assertThat(routingConfig.getDeadLetterExchange("some.input.queue"))
.isEqualTo(RoutingProperties.defaultDeadLetterExchange(DEFAULT_EXCHANGE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ void setUp() throws IOException {
failurePolicy = new FailurePolicy("some.input.queue");
routing =
RoutingProperties.minimal(
List.of("some.input.queue"), Map.of("default", "some.output.queue"));
List.of("some.input.queue"),
Map.of("default", List.of("some.output.queue", "another.output.queue")));

rabbitClient = mock(RabbitClient.class);
messageBroker = new MessageBroker(routing, rabbitClient);
Expand Down Expand Up @@ -87,7 +88,7 @@ void rejectShouldRouteToFailedQueueIfMessageIsRejectedTooOften() throws IOExcept
@DisplayName("Should send a message to the output queue")
void sendShouldRouteMessageToOutputQueue() throws IOException {
messageBroker.send(new Message());
verify(rabbitClient).send(any(), eq(routing.getOutgoing().get("default")), any());
verify(rabbitClient).send(any(), eq(routing.getOutgoing().get("default").get(0)), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
class RabbitMQTest {

private static final List<String> incoming = List.of("first.incoming", "first.incoming");
private static final Map<String, String> outgoing =
Map.of("first.rout", "first.outgoing", "second.route", "second.outgoing");
private static final Map<String, List<String>> outgoing =
Map.of(
"first.route",
List.of("first.outgoing"),
"second.route",
List.of("second.outgoing", "another.outgoing"));

private RabbitClient rabbitClient;
private RabbitMQ rabbitMQ;
Expand All @@ -34,7 +38,7 @@ private static Stream<Arguments> routesAndTopics() {
}

private static Stream<String> topics() {
return Stream.concat(incoming.stream(), outgoing.values().stream());
return Stream.concat(incoming.stream(), outgoing.values().stream().flatMap(List::stream));
}

private static Stream<String> queues() {
Expand All @@ -52,8 +56,13 @@ void setUp() {
@DisplayName("should provide matching topics for routes")
@ParameterizedTest
@MethodSource("routesAndTopics")
void shouldProvideMatchingTopicsForRoutes(String route, String topic) {
var expected = new Topic(topic, mock(MessageBroker.class), tracing);
void shouldProvideMatchingTopicsForRoutes(String route, List<String> topics) {
var expected =
new Route(
route,
topics.stream()
.map(topic -> new Topic(topic, mock(MessageBroker.class), tracing))
.toList());
assertThat(rabbitMQ.route(route)).isEqualTo(expected);
}

Expand Down
Loading