From 8236f1b39a944b31e0cc3ea63b29c259025d309b Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Sun, 28 Apr 2024 12:26:26 +0200 Subject: [PATCH] $Default rule is not added to subscription #254 Signed-off-by: Tomasz Maruszak --- src/Host.Plugin.Properties.xml | 2 +- .../ServiceBusTopologyService.cs | 60 ++++++++++--------- .../ServiceBusTopologySettings.cs | 12 ++++ .../ServiceBusMessageBusIt.cs | 37 ++++++++++-- .../ServiceBusTopologyServiceTests.cs | 5 +- 5 files changed, 80 insertions(+), 36 deletions(-) diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index b3abb6c6..4f188e3d 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 2.3.3 + 2.3.4 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs index 1d142712..0be23396 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs @@ -12,10 +12,10 @@ public class ServiceBusTopologyService public ServiceBusTopologyService(ILogger logger, MessageBusSettings settings, ServiceBusMessageBusSettings providerSettings) { - this._logger = logger; - this._settings = settings; - this._providerSettings = providerSettings; - this._adminClient = providerSettings.AdminClientFactory(); + _logger = logger; + _settings = settings; + _providerSettings = providerSettings; + _adminClient = providerSettings.AdminClientFactory(); } [Flags] @@ -153,7 +153,9 @@ private Task TryUpdateRule(string path, string subscript return TopologyCreationStatus.Exists | TopologyCreationStatus.Updated; }); - public async Task ProvisionTopology() + public Task ProvisionTopology() => _providerSettings.TopologyProvisioning.OnProvisionTopology(_adminClient, DoProvisionTopology); + + protected async Task DoProvisionTopology() { try { @@ -244,47 +246,51 @@ void ThrowOnFalse(bool value, string settingName) return options; }); - if ((subscriptionStatus & TopologyCreationStatus.Exists) != 0) + if ((subscriptionStatus & TopologyCreationStatus.Exists) != 0 && (topologyProvisioning.CanConsumerValidateSubscriptionFilters || topologyProvisioning.CanConsumerCreateSubscriptionFilter || topologyProvisioning.CanConsumerReplaceSubscriptionFilters)) { - if ((subscriptionStatus & TopologyCreationStatus.Created) != 0 && topologyProvisioning.CanConsumerCreateSubscriptionFilter) - { - // Note: for a newly created subscription, ASB creates a default filter automatically, we need to remove it and let the user defined rules take over - await _adminClient.DeleteRuleAsync(path, subscriptionName, RuleProperties.DefaultRuleName); - } - var tasks = new List(); - if (topologyProvisioning.CanConsumerValidateSubscriptionFilters || topologyProvisioning.CanConsumerCreateSubscriptionFilter || topologyProvisioning.CanConsumerReplaceSubscriptionFilters) - { - var rules = MergeFilters(path, subscriptionName, consumerSettingsGroup).ToDictionary(x => x.Name, x => x); + var rules = MergeFilters(path, subscriptionName, consumerSettingsGroup).ToDictionary(x => x.Name, x => x); - await foreach (var page in _adminClient.GetRulesAsync(path, subscriptionName).AsPages()) + await foreach (var page in _adminClient.GetRulesAsync(path, subscriptionName).AsPages()) + { + foreach (var serviceRule in page.Values) { - foreach (var serviceRule in page.Values) + if (!rules.TryGetValue(serviceRule.Name, out var rule)) { - if (!rules.TryGetValue(serviceRule.Name, out var rule)) + // server rule was not defined in SMB + if ((rules.Count > 0 || serviceRule.Name != RuleProperties.DefaultRuleName) && ((subscriptionStatus & TopologyCreationStatus.Created) != 0 || topologyProvisioning.CanConsumerReplaceSubscriptionFilters)) { + // Note: for a newly created subscription, ASB creates a $Default filter automatically + // We need to remove the filter if its not matching what is declared in SMB and let the user defined rules take over + // On the other hand if there are no user defined rules, we need to preserve the $Default filter tasks.Add(TryDeleteRule(path, subscriptionName, serviceRule.Name)); - continue; } + continue; + } - if (rule.Filter.Equals(serviceRule.Filter) - && ((rule.Action == null && serviceRule.Action == null) || (rule.Action.Equals(serviceRule.Action)))) - { - // is as expected / nothing to do - rules.Remove(serviceRule.Name); - continue; - } + if (rule.Filter.Equals(serviceRule.Filter) && ((rule.Action == null && serviceRule.Action == null) || (rule.Action.Equals(serviceRule.Action)))) + { + // server rule matched what is defined in SMB + rules.Remove(serviceRule.Name); + continue; + } + if (topologyProvisioning.CanConsumerReplaceSubscriptionFilters) + { + // update existing rule serviceRule.Filter = rule.Filter; serviceRule.Action = rule.Action; tasks.Add(TryUpdateRule(path, subscriptionName, serviceRule)); rules.Remove(serviceRule.Name); } + } + if (topologyProvisioning.CanConsumerCreateSubscriptionFilter) + { tasks.AddRange(rules.Values.Select(options => TryCreateRule(path, subscriptionName, options))); - await Task.WhenAll(tasks); } } + await Task.WhenAll(tasks); } } } diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologySettings.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologySettings.cs index 9fe6a81e..045c8b83 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologySettings.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologySettings.cs @@ -54,4 +54,16 @@ public class ServiceBusTopologySettings /// Default configuration to be applied when a rule needs to be created (). /// public Action CreateSubscriptionFilterOptions { get; set; } + /// + /// Interceptor that allows to intercept the topology provisioning process. + /// + public ServiceBusTopologyInterceptor OnProvisionTopology { get; set; } = (client, provision) => provision(); } + +/// +/// Interceptor that allows to intercept the topology provisioning process and to apply custom logic before and after the provisioning process. +/// +/// Service Bus admin client +/// Delegate allowing to perform topology provisioning +/// +public delegate Task ServiceBusTopologyInterceptor(ServiceBusAdministrationClient client, Func provision); diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs index 5a2996a8..472b332c 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs @@ -5,6 +5,7 @@ namespace SlimMessageBus.Host.AzureServiceBus.Test; using System.Globalization; using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -15,19 +16,28 @@ namespace SlimMessageBus.Host.AzureServiceBus.Test; using SlimMessageBus.Host.Test.Common.IntegrationTest; [Trait("Category", "Integration")] -public class ServiceBusMessageBusIt : BaseIntegrationTest +public class ServiceBusMessageBusIt(ITestOutputHelper testOutputHelper) : BaseIntegrationTest(testOutputHelper) { private const int NumberOfMessages = 77; - public ServiceBusMessageBusIt(ITestOutputHelper testOutputHelper) : base(testOutputHelper) - { - } + private Func CleanTopology { get; set; } protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { services.AddSlimMessageBus((mbb) => { - mbb.WithProviderServiceBus(cfg => cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"])); + mbb.WithProviderServiceBus(cfg => + { + cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + cfg.TopologyProvisioning.OnProvisionTopology = async (client, next) => + { + if (CleanTopology != null) + { + await CleanTopology(client); + } + await next(); + }; + }); mbb.AddServicesFromAssemblyContaining(); mbb.AddJsonSerializer(); ApplyBusConfiguration(mbb); @@ -76,6 +86,11 @@ public async Task BasicPubSubOnTopic() })); }); + CleanTopology = async client => + { + await client.DeleteTopicAsync(topic); + }; + await BasicPubSub(concurrency, subscribers, subscribers); } @@ -95,6 +110,12 @@ public async Task BasicPubSubOnQueue() .WithConsumer() .Instances(concurrency)); }); + + CleanTopology = async client => + { + await client.DeleteQueueAsync(queue); + }; + await BasicPubSub(concurrency, 1, 1); } @@ -114,6 +135,12 @@ public async Task BasicPubSubWithCustomConsumerOnQueue() .WithConsumer(typeof(CustomPingConsumer), typeof(PingDerivedMessage), nameof(CustomPingConsumer.Handle)) .Instances(concurrency)); }); + + CleanTopology = async client => + { + await client.DeleteQueueAsync(queue); + }; + await BasicPubSub(concurrency, 1, 1); } diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs index 2bb54ca4..b9149464 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs @@ -156,7 +156,7 @@ public async Task When_SubscriptionIsSharedBetweenConsumers_But_ConfigurationDif } [Fact] - public async Task When_SubscriptionIsCreated_AndCanCreateSubscriptionFilter_Then_DeleteDefaultRule() + public async Task When_SubscriptionIsCreated_AndCanCreateSubscriptionFilter_AndNoOtherRulesDeclared_Then_NeverDeleteDefaultRule() { // arrange ProviderBusSettings.TopologyProvisioning.CanConsumerCreateSubscription = true; @@ -165,14 +165,13 @@ public async Task When_SubscriptionIsCreated_AndCanCreateSubscriptionFilter_Then _mockAdminClient.Setup(x => x.TopicExistsAsync(_topicName, It.IsAny())).Returns(ResponseTask(true)); _mockAdminClient.Setup(x => x.SubscriptionExistsAsync(It.IsAny(), It.IsAny(), It.IsAny())).Returns(ResponseTask(false)); _mockAdminClient.Setup(x => x.CreateSubscriptionAsync(It.IsAny(), It.IsAny())); - _mockAdminClient.Setup(x => x.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName, It.IsAny())).Verifiable(); // act await _target.ProvisionTopology(); // assert _mockAdminClient.Verify(x => x.CreateSubscriptionAsync(It.IsAny(), It.IsAny()), Times.Once); - _mockAdminClient.Verify(x => x.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName, It.IsAny()), Times.Once); + _mockAdminClient.Verify(x => x.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName, It.IsAny()), Times.Never); } [Fact]