Skip to content

Commit

Permalink
Update dependency io.quarkiverse.reactivemessaging.nats-jetstream:qua…
Browse files Browse the repository at this point in the history
…rkus-messaging-nats-jetstream to v3.17.7 (#10052)

* Update dependency io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-messaging-nats-jetstream to v3.17.7

* fix nats example

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Alexandre Dutra <[email protected]>
  • Loading branch information
renovate[bot] and adutra authored Dec 11, 2024
1 parent 5e0dc21 commit c8502ad
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 35 deletions.
2 changes: 1 addition & 1 deletion events/ri/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {

// Quarkus - NATS
implementation(
"io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-messaging-nats-jetstream:3.17.1"
"io.quarkiverse.reactivemessaging.nats-jetstream:quarkus-messaging-nats-jetstream:3.17.7"
)

// Avro serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package org.projectnessie.events.ri.messaging.nats;

import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamOutgoingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.SubscribeMessageMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -48,53 +48,48 @@ public AbstractNatsEventSubscriber(Emitter<T> emitter, EventSubscriberConfig con
protected Message<T> createMessage(Event upstreamEvent, T messagePayload) {
Map<String, List<String>> headers = new HashMap<>();
createHeaders(upstreamEvent, (name, value) -> headers.put(name, List.of(value)));
JetStreamOutgoingMessageMetadata metadata =
JetStreamOutgoingMessageMetadata.of(
upstreamEvent.getIdAsText(), headers, subtopic(upstreamEvent));
SubscribeMessageMetadata metadata =
SubscribeMessageMetadata.builder()
.messageId(upstreamEvent.getIdAsText())
.headers(headers)
.subject(subject(upstreamEvent))
.build();
return Message.of(messagePayload, Metadata.of(metadata));
}

@Override
protected CompletionStage<Void> onWriteAck(Metadata metadata) {
// Do NOT enable this log statement in production!
if (LOGGER.isDebugEnabled()) {
JetStreamOutgoingMessageMetadata jetStreamMetadata =
metadata.get(JetStreamOutgoingMessageMetadata.class).orElseThrow();
SubscribeMessageMetadata jetStreamMetadata =
metadata.get(SubscribeMessageMetadata.class).orElseThrow();
String id = jetStreamMetadata.messageId();
String subtopic = jetStreamMetadata.subtopic().orElse("<?>");
LOGGER.debug("Event written: messageId={}, subtopic={}", id, subtopic);
String subject = jetStreamMetadata.subjectOptional().orElse("<?>");
LOGGER.debug("Event written: messageId={}, subject={}", id, subject);
}
return CompletableFuture.completedFuture(null); // immediate ack
}

@Override
protected CompletionStage<Void> onWriteNack(Throwable error, Metadata metadata) {
JetStreamOutgoingMessageMetadata jetStreamMetadata =
metadata.get(JetStreamOutgoingMessageMetadata.class).orElseThrow();
SubscribeMessageMetadata jetStreamMetadata =
metadata.get(SubscribeMessageMetadata.class).orElseThrow();
String id = jetStreamMetadata.messageId();
String subtopic = jetStreamMetadata.subtopic().orElse("<?>");
LOGGER.error("Failed to write event: messageId={}, subtopic={}", id, subtopic, error);
String subject = jetStreamMetadata.subjectOptional().orElse("<?>");
LOGGER.error("Failed to write event: messageId={}, subject={}", id, subject, error);
return CompletableFuture.completedFuture(null); // immediate ack
}

/**
* Determine the subtopic for the given event. The subtopic is appended to the configured NATS
* root subject (see application.properties) to form the final subject to which the event is
* published.
*
* @param event The event for which to determine the subtopic.
* @return The subtopic for the given event.
*/
public static String subtopic(Event event) {
public static String subject(Event event) {
return switch (event.getType()) {
case MERGE -> "merge";
case TRANSPLANT -> "transplant";
case COMMIT -> "commit";
case CONTENT_STORED -> "commit.content.stored";
case CONTENT_REMOVED -> "commit.content.removed";
case REFERENCE_CREATED -> "reference.created";
case REFERENCE_UPDATED -> "reference.updated";
case REFERENCE_DELETED -> "reference.deleted";
case MERGE -> "nessie.events.merge";
case TRANSPLANT -> "nessie.events.transplant";
case COMMIT -> "nessie.events.commit";
case CONTENT_STORED -> "nessie.events.commit.content.stored";
case CONTENT_REMOVED -> "nessie.events.commit.content.removed";
case REFERENCE_CREATED -> "nessie.events.reference.created";
case REFERENCE_UPDATED -> "nessie.events.reference.updated";
case REFERENCE_DELETED -> "nessie.events.reference.deleted";
};
}
}
3 changes: 3 additions & 0 deletions events/ri/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ quarkus.log.category."com.github.dockerjava".level=INFO
%test.mp.messaging.incoming.nessie-nats-json-consumer.max-deliver=1
%test.mp.messaging.incoming.nessie-nats-json-consumer.durable=nessie-nats-json-consumer

# noisy logger
%test.quarkus.log.category."io.quarkiverse.reactive.messaging.nats.jetstream.client.ReaderSubscription".level=ERROR

# DevServices configuration

quarkus.kafka.devservices.topic-partitions.nessie-events-avro=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.projectnessie.events.ri.messaging.MessageHeaders.REPOSITORY_ID;
import static org.projectnessie.events.ri.messaging.MessageHeaders.SPEC_VERSION;

import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamIncomingMessageMetadata;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PublishMessageMetadata;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import java.util.List;
Expand Down Expand Up @@ -55,11 +55,11 @@ protected Message<Event> receive() {

@Override
protected void checkMessage(Message<Event> message, Event expectedPayload) {
JetStreamIncomingMessageMetadata metadata =
message.getMetadata().get(JetStreamIncomingMessageMetadata.class).orElseThrow();
PublishMessageMetadata metadata =
message.getMetadata().get(PublishMessageMetadata.class).orElseThrow();
assertThat(metadata.messageId()).isEqualTo(expectedPayload.getIdAsText());
String subtopic = AbstractNatsEventSubscriber.subtopic(expectedPayload);
assertThat(metadata.subject()).contains("nessie.events." + subtopic);
String subject = AbstractNatsEventSubscriber.subject(expectedPayload);
assertThat(metadata.subject()).isEqualTo(subject);
checkHeaders(metadata.headers(), expectedPayload);
assertThat(message.getPayload()).isEqualTo(expectedPayload);
}
Expand Down

0 comments on commit c8502ad

Please sign in to comment.