Skip to content

Commit

Permalink
$Default rule is not added to subscription #254
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Apr 28, 2024
1 parent 1b74fea commit 8236f1b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.3.3</Version>
<Version>2.3.4</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ public class ServiceBusTopologyService

public ServiceBusTopologyService(ILogger<ServiceBusTopologyService> logger, MessageBusSettings settings, ServiceBusMessageBusSettings providerSettings)
{
this._logger = logger;
this._settings = settings;
this._providerSettings = providerSettings;
this._adminClient = providerSettings.AdminClientFactory();
_logger = logger;
_settings = settings;
_providerSettings = providerSettings;
_adminClient = providerSettings.AdminClientFactory();
}

[Flags]
Expand Down Expand Up @@ -153,7 +153,9 @@ private Task<TopologyCreationStatus> TryUpdateRule(string path, string subscript
return TopologyCreationStatus.Exists | TopologyCreationStatus.Updated;
});

public async Task ProvisionTopology()
public Task ProvisionTopology() => _providerSettings.TopologyProvisioning.OnProvisionTopology(_adminClient, DoProvisionTopology);

protected async Task DoProvisionTopology()
{
try
{
Expand Down Expand Up @@ -244,47 +246,51 @@ void ThrowOnFalse(bool value, string settingName)
return options;
});

if ((subscriptionStatus & TopologyCreationStatus.Exists) != 0)
if ((subscriptionStatus & TopologyCreationStatus.Exists) != 0 && (topologyProvisioning.CanConsumerValidateSubscriptionFilters || topologyProvisioning.CanConsumerCreateSubscriptionFilter || topologyProvisioning.CanConsumerReplaceSubscriptionFilters))
{
if ((subscriptionStatus & TopologyCreationStatus.Created) != 0 && topologyProvisioning.CanConsumerCreateSubscriptionFilter)
{
// Note: for a newly created subscription, ASB creates a default filter automatically, we need to remove it and let the user defined rules take over
await _adminClient.DeleteRuleAsync(path, subscriptionName, RuleProperties.DefaultRuleName);
}

var tasks = new List<Task>();
if (topologyProvisioning.CanConsumerValidateSubscriptionFilters || topologyProvisioning.CanConsumerCreateSubscriptionFilter || topologyProvisioning.CanConsumerReplaceSubscriptionFilters)
{
var rules = MergeFilters(path, subscriptionName, consumerSettingsGroup).ToDictionary(x => x.Name, x => x);
var rules = MergeFilters(path, subscriptionName, consumerSettingsGroup).ToDictionary(x => x.Name, x => x);

await foreach (var page in _adminClient.GetRulesAsync(path, subscriptionName).AsPages())
await foreach (var page in _adminClient.GetRulesAsync(path, subscriptionName).AsPages())
{
foreach (var serviceRule in page.Values)
{
foreach (var serviceRule in page.Values)
if (!rules.TryGetValue(serviceRule.Name, out var rule))
{
if (!rules.TryGetValue(serviceRule.Name, out var rule))
// server rule was not defined in SMB
if ((rules.Count > 0 || serviceRule.Name != RuleProperties.DefaultRuleName) && ((subscriptionStatus & TopologyCreationStatus.Created) != 0 || topologyProvisioning.CanConsumerReplaceSubscriptionFilters))
{
// Note: for a newly created subscription, ASB creates a $Default filter automatically
// We need to remove the filter if its not matching what is declared in SMB and let the user defined rules take over
// On the other hand if there are no user defined rules, we need to preserve the $Default filter
tasks.Add(TryDeleteRule(path, subscriptionName, serviceRule.Name));
continue;
}
continue;
}

if (rule.Filter.Equals(serviceRule.Filter)
&& ((rule.Action == null && serviceRule.Action == null) || (rule.Action.Equals(serviceRule.Action))))
{
// is as expected / nothing to do
rules.Remove(serviceRule.Name);
continue;
}
if (rule.Filter.Equals(serviceRule.Filter) && ((rule.Action == null && serviceRule.Action == null) || (rule.Action.Equals(serviceRule.Action))))
{
// server rule matched what is defined in SMB
rules.Remove(serviceRule.Name);
continue;
}

if (topologyProvisioning.CanConsumerReplaceSubscriptionFilters)
{
// update existing rule
serviceRule.Filter = rule.Filter;
serviceRule.Action = rule.Action;
tasks.Add(TryUpdateRule(path, subscriptionName, serviceRule));
rules.Remove(serviceRule.Name);
}
}

if (topologyProvisioning.CanConsumerCreateSubscriptionFilter)
{
tasks.AddRange(rules.Values.Select(options => TryCreateRule(path, subscriptionName, options)));
await Task.WhenAll(tasks);
}
}
await Task.WhenAll(tasks);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,16 @@ public class ServiceBusTopologySettings
/// Default configuration to be applied when a rule needs to be created (<see cref="CreateSubscriptionFilterOptions"/>).
/// </summary>
public Action<CreateRuleOptions> CreateSubscriptionFilterOptions { get; set; }
/// <summary>
/// Interceptor that allows to intercept the topology provisioning process.
/// </summary>
public ServiceBusTopologyInterceptor OnProvisionTopology { get; set; } = (client, provision) => provision();
}

/// <summary>
/// Interceptor that allows to intercept the topology provisioning process and to apply custom logic before and after the provisioning process.
/// </summary>
/// <param name="client">Service Bus admin client</param>
/// <param name="provision">Delegate allowing to perform topology provisioning</param>
/// <returns></returns>
public delegate Task ServiceBusTopologyInterceptor(ServiceBusAdministrationClient client, Func<Task> provision);
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace SlimMessageBus.Host.AzureServiceBus.Test;
using System.Globalization;

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -15,19 +16,28 @@ namespace SlimMessageBus.Host.AzureServiceBus.Test;
using SlimMessageBus.Host.Test.Common.IntegrationTest;

[Trait("Category", "Integration")]
public class ServiceBusMessageBusIt : BaseIntegrationTest<ServiceBusMessageBusIt>
public class ServiceBusMessageBusIt(ITestOutputHelper testOutputHelper) : BaseIntegrationTest<ServiceBusMessageBusIt>(testOutputHelper)
{
private const int NumberOfMessages = 77;

public ServiceBusMessageBusIt(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
}
private Func<ServiceBusAdministrationClient, Task> CleanTopology { get; set; }

protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
{
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderServiceBus(cfg => cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]));
mbb.WithProviderServiceBus(cfg =>
{
cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
cfg.TopologyProvisioning.OnProvisionTopology = async (client, next) =>
{
if (CleanTopology != null)
{
await CleanTopology(client);
}
await next();
};
});
mbb.AddServicesFromAssemblyContaining<PingConsumer>();
mbb.AddJsonSerializer();
ApplyBusConfiguration(mbb);
Expand Down Expand Up @@ -76,6 +86,11 @@ public async Task BasicPubSubOnTopic()
}));
});

CleanTopology = async client =>
{
await client.DeleteTopicAsync(topic);
};

await BasicPubSub(concurrency, subscribers, subscribers);
}

Expand All @@ -95,6 +110,12 @@ public async Task BasicPubSubOnQueue()
.WithConsumer<PingDerivedConsumer, PingDerivedMessage>()
.Instances(concurrency));
});

CleanTopology = async client =>
{
await client.DeleteQueueAsync(queue);
};

await BasicPubSub(concurrency, 1, 1);
}

Expand All @@ -114,6 +135,12 @@ public async Task BasicPubSubWithCustomConsumerOnQueue()
.WithConsumer(typeof(CustomPingConsumer), typeof(PingDerivedMessage), nameof(CustomPingConsumer.Handle))
.Instances(concurrency));
});

CleanTopology = async client =>
{
await client.DeleteQueueAsync(queue);
};

await BasicPubSub(concurrency, 1, 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public async Task When_SubscriptionIsSharedBetweenConsumers_But_ConfigurationDif
}

[Fact]
public async Task When_SubscriptionIsCreated_AndCanCreateSubscriptionFilter_Then_DeleteDefaultRule()
public async Task When_SubscriptionIsCreated_AndCanCreateSubscriptionFilter_AndNoOtherRulesDeclared_Then_NeverDeleteDefaultRule()
{
// arrange
ProviderBusSettings.TopologyProvisioning.CanConsumerCreateSubscription = true;
Expand All @@ -165,14 +165,13 @@ public async Task When_SubscriptionIsCreated_AndCanCreateSubscriptionFilter_Then
_mockAdminClient.Setup(x => x.TopicExistsAsync(_topicName, It.IsAny<CancellationToken>())).Returns(ResponseTask(true));
_mockAdminClient.Setup(x => x.SubscriptionExistsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>())).Returns(ResponseTask(false));
_mockAdminClient.Setup(x => x.CreateSubscriptionAsync(It.IsAny<CreateSubscriptionOptions>(), It.IsAny<CancellationToken>()));
_mockAdminClient.Setup(x => x.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName, It.IsAny<CancellationToken>())).Verifiable();

// act
await _target.ProvisionTopology();

// assert
_mockAdminClient.Verify(x => x.CreateSubscriptionAsync(It.IsAny<CreateSubscriptionOptions>(), It.IsAny<CancellationToken>()), Times.Once);
_mockAdminClient.Verify(x => x.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName, It.IsAny<CancellationToken>()), Times.Once);
_mockAdminClient.Verify(x => x.DeleteRuleAsync(_topicName, _subscriptionName, RuleProperties.DefaultRuleName, It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
Expand Down

0 comments on commit 8236f1b

Please sign in to comment.