Skip to content

Commit

Permalink
Rework an observation for Rabbit Binder
Browse files Browse the repository at this point in the history
The observation propagation doesn't work in multi-binder configuration

* Remove `ObservationAutoConfiguration` since it is not visible in case of multi-binder configuration
* Instead move `setObservationEnabled` flag setting to the `RabbitMessageChannelBinder`
* Add `io.micrometer.observation.ObservationRegistry` into `shared.beans` to make it visible for binder-specific application context
* Add `RabbitMultiBinderObservationTests` integration test where Rabbit Binder is in multi-binder environment

As a side effect this fixes an observation propagation for Kafka binder as well in the multi-binder environment.
Its configuration is OK, but an `ObservationRegistry` must make it visible for the binder-specific application context.
See the mentioned `shared.beans`

Related to spring-cloud#2901
Also see spring-cloud#2902 for possible evolution
  • Loading branch information
artembilan committed Feb 14, 2024
1 parent 7fa7228 commit f48e4a1
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 70 deletions.
32 changes: 32 additions & 0 deletions binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@
<version>1.17.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.opentelemetry</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.wavefront</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-reporter-wavefront</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!-- TODO: upgrade to httpclient 5 -->
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import io.micrometer.observation.ObservationRegistry;
import jakarta.validation.constraints.NotNull;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
Expand All @@ -54,6 +55,7 @@
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.Expression;
Expand Down Expand Up @@ -500,7 +503,7 @@ protected MessageProducer createConsumerEndpoint(
"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
String destination = consumerDestination.getName();
RabbitConsumerProperties extension = properties.getExtension();
MessageListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
ObservableListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
properties, destination, extension);
String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
if (properties.getExtension().getContainerType() != ContainerType.STREAM
Expand All @@ -509,6 +512,10 @@ protected MessageProducer createConsumerEndpoint(
}
getContainerCustomizer().configure(listenerContainer,
consumerDestination.getName(), group);
// TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902
getApplicationContext().getBeanProvider(ObservationRegistry.class)
.ifAvailable((observationRegistry) -> listenerContainer.setObservationEnabled(true));

listenerContainer.afterPropertiesSet();

AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
Expand Down Expand Up @@ -540,7 +547,7 @@ protected MessageProducer createConsumerEndpoint(
return adapter;
}

private MessageListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
private ObservableListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
RabbitConsumerProperties extension) {

Expand Down Expand Up @@ -597,6 +604,7 @@ else if (getApplicationContext() != null) {
q -> extension.getConsumerTagPrefix() + "#"
+ index.getAndIncrement());
}
listenerContainer.setApplicationContext(getApplicationContext());
return listenerContainer;
}

Expand Down Expand Up @@ -1048,6 +1056,11 @@ private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties,
retryTemplate.setBackOffPolicy(backOff);
rabbitTemplate.setRetryTemplate(retryTemplate);
}
// TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902
AbstractApplicationContext applicationContext = getApplicationContext();
applicationContext.getBeanProvider(ObservationRegistry.class)
.ifAvailable((observationRegistry) -> rabbitTemplate.setObservationEnabled(true));
rabbitTemplate.setApplicationContext(applicationContext);
rabbitTemplate.afterPropertiesSet();
return rabbitTemplate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.rabbitmq.stream.Environment;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
Expand Down Expand Up @@ -52,6 +52,7 @@
* spring-rabbit-stream.
*
* @author Gary Russell
* @author Artem Bilan
* @since 3.2
*
*/
Expand All @@ -67,11 +68,10 @@ private StreamUtils() {
* @param group the group.
* @param properties the properties.
* @param destination the destination.
* @param extension the properties extension.
* @param applicationContext the application context.
* @return the container.
*/
public static MessageListenerContainer createContainer(ConsumerDestination consumerDestination, String group,
public static ObservableListenerContainer createContainer(ConsumerDestination consumerDestination, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
ApplicationContext applicationContext) {

Expand Down Expand Up @@ -143,7 +143,7 @@ public void fromHeadersToReply(MessageHeaders headers, MessageProperties target)
* @param errorChannel the error channel
* @param destination the destination.
* @param extendedProperties the extended properties.
* @param abstractApplicationContext the application context.
* @param applicationContext the application context.
* @param headerMapperFunction the header mapper function.
* @return the handler.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration
org.springframework.cloud.stream.binder.rabbit.config.ObservationAutoConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.rabbit.integration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import brave.handler.SpanHandler;
import brave.test.TestSpanHandler;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
import io.micrometer.tracing.test.simple.SpansAssert;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* @author Artem Bilan
* @since 4.1.1
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
args = "--spring.config.location=classpath:/rabbit-multi-binder-observation.yml")
@DirtiesContext
@AutoConfigureObservability
public class RabbitMultiBinderObservationTests {

private static final TestSpanHandler SPANS = new TestSpanHandler();

private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();

@Autowired
StreamBridge streamBridge;

@Autowired
ObservationRegistry observationRegistry;

@Autowired
TestConfiguration testConfiguration;

@DynamicPropertySource
static void rabbitProperties(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.port", RABBITMQ::getAmqpPort);
}

@Test
void observationIsPropagatedInMultiBinderConfiguration() throws InterruptedException {
Observation.createNotStarted("test parent observation", this.observationRegistry)
.observe(() -> this.streamBridge.send("test-out-0", "test data"));

assertThat(this.testConfiguration.messageReceived.await(10, TimeUnit.SECONDS)).isTrue();

// There is a race condition when we already have a reply, but the span in the
// Rabbit listener is not closed yet.
// parent -> StreamBridge -> RabbitTemplate -> Rabbit Listener -> Consumer
await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(5));
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
.haveSameTraceId();
}

@SpringBootConfiguration
@EnableAutoConfiguration
public static class TestConfiguration {

final CountDownLatch messageReceived = new CountDownLatch(1);

@Bean
SpanHandler testSpanHandler() {
return SPANS;
}

@Bean
public Consumer<Message<?>> testListener() {
return message -> this.messageReceived.countDown();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
spring:
cloud:
function:
definition: testListener
stream:
output-bindings: test
bindings:
test-out-0:
binder: rabbit
destination: test
group: test
testListener-in-0:
binder: rabbit
destination: test
group: test
binders:
rabbit:
type: rabbit
environment:
spring:
cloud:
stream:
rabbit:
binder:
enableObservation: true
logging:
pattern:
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"

management:
tracing:
sampling:
probability: 1

Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer
org.springframework.cloud.stream.binder.test.InputDestination
org.springframework.cloud.stream.binder.test.OutputDestination
org.springframework.cloud.stream.config.BindingHandlerAdvise
io.micrometer.observation.ObservationRegistry

0 comments on commit f48e4a1

Please sign in to comment.