Skip to content

Commit

Permalink
fix: More robust shutdown/cleanup/reset
Browse files Browse the repository at this point in the history
Previously, we shutdown the consumer thread causing any reuse of the
Api to block on the second event emitted to the event executor.

Fixes: #186

Signed-off-by: Austin Drenski <[email protected]>
  • Loading branch information
austindrenski committed Jan 17, 2024
1 parent a6062fe commit 4c5d79f
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 94 deletions.
3 changes: 2 additions & 1 deletion build/Common.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<LangVersion>7.3</LangVersion>
<LangVersion>latest</LangVersion>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
</PropertyGroup>
Expand All @@ -19,6 +19,7 @@
Please sort alphabetically.
Refer to https://docs.microsoft.com/nuget/concepts/package-versioning for semver syntax.
-->
<MicrosoftBclAsyncInterfacesVer>[8.0.0,)</MicrosoftBclAsyncInterfacesVer>
<MicrosoftExtensionsLoggerVer>[2.0,)</MicrosoftExtensionsLoggerVer>
<MicrosoftSourceLinkGitHubPkgVer>[1.0.0,2.0)</MicrosoftSourceLinkGitHubPkgVer>
</PropertyGroup>
Expand Down
34 changes: 23 additions & 11 deletions src/OpenFeature/Api.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ namespace OpenFeature
public sealed class Api : IEventBus
{
private EvaluationContext _evaluationContext = EvaluationContext.Empty;
private readonly ProviderRepository _repository = new ProviderRepository();
private EventExecutor _eventExecutor = new EventExecutor();
private ProviderRepository _repository = new ProviderRepository();
private readonly ConcurrentStack<Hook> _hooks = new ConcurrentStack<Hook>();

/// The reader/writer locks are not disposed because the singleton instance should never be disposed.
private readonly ReaderWriterLockSlim _evaluationContextLock = new ReaderWriterLockSlim();

internal readonly EventExecutor EventExecutor = new EventExecutor();


/// <summary>
/// Singleton instance of Api
/// </summary>
Expand All @@ -45,7 +43,7 @@ private Api() { }
/// <param name="featureProvider">Implementation of <see cref="FeatureProvider"/></param>
public async Task SetProvider(FeatureProvider featureProvider)
{
this.EventExecutor.RegisterDefaultFeatureProvider(featureProvider);
this._eventExecutor.RegisterDefaultFeatureProvider(featureProvider);
await this._repository.SetProvider(featureProvider, this.GetContext()).ConfigureAwait(false);
}

Expand All @@ -58,7 +56,7 @@ public async Task SetProvider(FeatureProvider featureProvider)
/// <param name="featureProvider">Implementation of <see cref="FeatureProvider"/></param>
public async Task SetProvider(string clientName, FeatureProvider featureProvider)
{
this.EventExecutor.RegisterClientFeatureProvider(clientName, featureProvider);
this._eventExecutor.RegisterClientFeatureProvider(clientName, featureProvider);
await this._repository.SetProvider(clientName, featureProvider, this.GetContext()).ConfigureAwait(false);
}

Expand Down Expand Up @@ -205,20 +203,28 @@ public EvaluationContext GetContext()
/// </summary>
public async Task Shutdown()
{
await this._repository.Shutdown().ConfigureAwait(false);
await this.EventExecutor.Shutdown().ConfigureAwait(false);
await using (this._eventExecutor.ConfigureAwait(false))
await using (this._repository.ConfigureAwait(false))
{
this._evaluationContext = EvaluationContext.Empty;
this._hooks.Clear();

// TODO: make these lazy to avoid extra allocations on the common cleanup path?
this._eventExecutor = new EventExecutor();
this._repository = new ProviderRepository();
}
}

/// <inheritdoc />
public void AddHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
this.EventExecutor.AddApiLevelHandler(type, handler);
this._eventExecutor.AddApiLevelHandler(type, handler);
}

/// <inheritdoc />
public void RemoveHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
this.EventExecutor.RemoveApiLevelHandler(type, handler);
this._eventExecutor.RemoveApiLevelHandler(type, handler);
}

/// <summary>
Expand All @@ -227,7 +233,13 @@ public void RemoveHandler(ProviderEventTypes type, EventHandlerDelegate handler)
/// <param name="logger">The logger to be used</param>
public void SetLogger(ILogger logger)
{
this.EventExecutor.Logger = logger;
this._eventExecutor.Logger = logger;

Check warning on line 236 in src/OpenFeature/Api.cs

View check run for this annotation

Codecov / codecov/patch

src/OpenFeature/Api.cs#L236

Added line #L236 was not covered by tests
}

internal void AddClientHandler(string client, ProviderEventTypes eventType, EventHandlerDelegate handler)
=> this._eventExecutor.AddClientHandler(client, eventType, handler);

internal void RemoveClientHandler(string client, ProviderEventTypes eventType, EventHandlerDelegate handler)
=> this._eventExecutor.RemoveClientHandler(client, eventType, handler);
}
}
102 changes: 31 additions & 71 deletions src/OpenFeature/EventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@

namespace OpenFeature
{

internal delegate Task ShutdownDelegate();

internal class EventExecutor
internal class EventExecutor : IAsyncDisposable
{
private readonly object _lockObj = new object();
public readonly Channel<object> EventChannel = Channel.CreateBounded<object>(1);
private FeatureProviderReference _defaultProvider;
private readonly Dictionary<string, FeatureProviderReference> _namedProviderReferences = new Dictionary<string, FeatureProviderReference>();
private readonly List<FeatureProviderReference> _activeSubscriptions = new List<FeatureProviderReference>();
private readonly SemaphoreSlim _shutdownSemaphore = new SemaphoreSlim(0);

private ShutdownDelegate _shutdownDelegate;
private FeatureProvider _defaultProvider;
private readonly Dictionary<string, FeatureProvider> _namedProviderReferences = new Dictionary<string, FeatureProvider>();
private readonly List<FeatureProvider> _activeSubscriptions = new List<FeatureProvider>();

private readonly Dictionary<ProviderEventTypes, List<EventHandlerDelegate>> _apiHandlers = new Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>();
private readonly Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>> _clientHandlers = new Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>>();
Expand All @@ -32,11 +26,12 @@ internal class EventExecutor
public EventExecutor()
{
this.Logger = new Logger<EventExecutor>(new NullLoggerFactory());
this._shutdownDelegate = this.SignalShutdownAsync;
var eventProcessing = new Thread(this.ProcessEventAsync);
eventProcessing.Start();
}

public ValueTask DisposeAsync() => new(this.Shutdown());

internal void AddApiLevelHandler(ProviderEventTypes eventType, EventHandlerDelegate handler)
{
lock (this._lockObj)
Expand Down Expand Up @@ -114,7 +109,7 @@ internal void RegisterDefaultFeatureProvider(FeatureProvider provider)
{
var oldProvider = this._defaultProvider;

this._defaultProvider = new FeatureProviderReference(provider);
this._defaultProvider = provider;

this.StartListeningAndShutdownOld(this._defaultProvider, oldProvider);
}
Expand All @@ -128,8 +123,8 @@ internal void RegisterClientFeatureProvider(string client, FeatureProvider provi
}
lock (this._lockObj)
{
var newProvider = new FeatureProviderReference(provider);
FeatureProviderReference oldProvider = null;
var newProvider = provider;
FeatureProvider oldProvider = null;
if (this._namedProviderReferences.TryGetValue(client, out var foundOldProvider))
{
oldProvider = foundOldProvider;
Expand All @@ -141,7 +136,7 @@ internal void RegisterClientFeatureProvider(string client, FeatureProvider provi
}
}

private void StartListeningAndShutdownOld(FeatureProviderReference newProvider, FeatureProviderReference oldProvider)
private void StartListeningAndShutdownOld(FeatureProvider newProvider, FeatureProvider oldProvider)
{
// check if the provider is already active - if not, we need to start listening for its emitted events
if (!this.IsProviderActive(newProvider))
Expand All @@ -154,15 +149,11 @@ private void StartListeningAndShutdownOld(FeatureProviderReference newProvider,
if (oldProvider != null && !this.IsProviderBound(oldProvider))
{
this._activeSubscriptions.Remove(oldProvider);
var channel = oldProvider.Provider.GetEventChannel();
if (channel != null)
{
channel.Writer.WriteAsync(new ShutdownSignal());
}
oldProvider.GetEventChannel()?.Writer.Complete();
}
}

private bool IsProviderBound(FeatureProviderReference provider)
private bool IsProviderBound(FeatureProvider provider)
{
if (this._defaultProvider == provider)
{
Expand All @@ -178,18 +169,18 @@ private bool IsProviderBound(FeatureProviderReference provider)
return false;
}

private bool IsProviderActive(FeatureProviderReference providerRef)
private bool IsProviderActive(FeatureProvider providerRef)
{
return this._activeSubscriptions.Contains(providerRef);
}

private void EmitOnRegistration(FeatureProviderReference provider, ProviderEventTypes eventType, EventHandlerDelegate handler)
private void EmitOnRegistration(FeatureProvider provider, ProviderEventTypes eventType, EventHandlerDelegate handler)
{
if (provider == null)
{
return;
}
var status = provider.Provider.GetStatus();
var status = provider.GetStatus();

var message = "";
if (status == ProviderStatus.Ready && eventType == ProviderEventTypes.ProviderReady)
Expand All @@ -211,7 +202,7 @@ private void EmitOnRegistration(FeatureProviderReference provider, ProviderEvent
{
handler.Invoke(new ProviderEventPayload
{
ProviderName = provider.Provider?.GetMetadata()?.Name,
ProviderName = provider.GetMetadata()?.Name,
Type = eventType,
Message = message
});
Expand All @@ -225,33 +216,33 @@ private void EmitOnRegistration(FeatureProviderReference provider, ProviderEvent

private async void ProcessFeatureProviderEventsAsync(object providerRef)
{
while (true)
var typedProviderRef = (FeatureProvider)providerRef;
if (typedProviderRef.GetEventChannel() is not { Reader: {} reader })
{
var typedProviderRef = (FeatureProviderReference)providerRef;
if (typedProviderRef.Provider.GetEventChannel() == null)
{
return;
}
var item = await typedProviderRef.Provider.GetEventChannel().Reader.ReadAsync().ConfigureAwait(false);
return;
}

while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
if (!reader.TryRead(out var item))
continue;

Check warning on line 228 in src/OpenFeature/EventExecutor.cs

View check run for this annotation

Codecov / codecov/patch

src/OpenFeature/EventExecutor.cs#L228

Added line #L228 was not covered by tests

switch (item)
{
case ProviderEventPayload eventPayload:
await this.EventChannel.Writer.WriteAsync(new Event { Provider = typedProviderRef, EventPayload = eventPayload }).ConfigureAwait(false);
break;
case ShutdownSignal _:
typedProviderRef.ShutdownSemaphore.Release();
return;
}
}
}

// Method to process events
private async void ProcessEventAsync()
{
while (true)
while (await this.EventChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
var item = await this.EventChannel.Reader.ReadAsync().ConfigureAwait(false);
if (!this.EventChannel.Reader.TryRead(out var item))
continue;

Check warning on line 245 in src/OpenFeature/EventExecutor.cs

View check run for this annotation

Codecov / codecov/patch

src/OpenFeature/EventExecutor.cs#L245

Added line #L245 was not covered by tests

switch (item)
{
Expand Down Expand Up @@ -307,9 +298,6 @@ private async void ProcessEventAsync()
}
}
break;
case ShutdownSignal _:
this._shutdownSemaphore.Release();
return;
}

}
Expand All @@ -329,43 +317,15 @@ private void InvokeEventHandler(EventHandlerDelegate eventHandler, Event e)

public async Task Shutdown()
{
await this._shutdownDelegate().ConfigureAwait(false);
}
this.EventChannel.Writer.Complete();

internal void SetShutdownDelegate(ShutdownDelegate del)
{
this._shutdownDelegate = del;
}

// Method to signal shutdown
private async Task SignalShutdownAsync()
{
// Enqueue a shutdown signal
await this.EventChannel.Writer.WriteAsync(new ShutdownSignal()).ConfigureAwait(false);

// Wait for the processing loop to acknowledge the shutdown
await this._shutdownSemaphore.WaitAsync().ConfigureAwait(false);
}
}

internal class ShutdownSignal
{
}

internal class FeatureProviderReference
{
internal readonly SemaphoreSlim ShutdownSemaphore = new SemaphoreSlim(0);
internal FeatureProvider Provider { get; }

public FeatureProviderReference(FeatureProvider provider)
{
this.Provider = provider;
await this.EventChannel.Reader.Completion.ConfigureAwait(false);
}
}

internal class Event
{
internal FeatureProviderReference Provider { get; set; }
internal FeatureProvider Provider { get; set; }
internal ProviderEventPayload EventPayload { get; set; }
}
}
1 change: 1 addition & 0 deletions src/OpenFeature/OpenFeature.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="$(MicrosoftBclAsyncInterfacesVer)" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsLoggerVer)" />
</ItemGroup>

Expand Down
4 changes: 2 additions & 2 deletions src/OpenFeature/OpenFeatureClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ public FeatureClient(string name, string version, ILogger logger = null, Evaluat
/// <inheritdoc />
public void AddHandler(ProviderEventTypes eventType, EventHandlerDelegate handler)
{
Api.Instance.EventExecutor.AddClientHandler(this._metadata.Name, eventType, handler);
Api.Instance.AddClientHandler(this._metadata.Name, eventType, handler);
}

/// <inheritdoc />
public void RemoveHandler(ProviderEventTypes type, EventHandlerDelegate handler)
{
Api.Instance.EventExecutor.RemoveClientHandler(this._metadata.Name, type, handler);
Api.Instance.RemoveClientHandler(this._metadata.Name, type, handler);
}

/// <inheritdoc />
Expand Down
10 changes: 9 additions & 1 deletion src/OpenFeature/ProviderRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace OpenFeature
/// <summary>
/// This class manages the collection of providers, both default and named, contained by the API.
/// </summary>
internal class ProviderRepository
internal class ProviderRepository : IAsyncDisposable
{
private FeatureProvider _defaultProvider = new NoOpFeatureProvider();

Expand All @@ -31,6 +31,14 @@ internal class ProviderRepository
/// of that provider under different names..
private readonly ReaderWriterLockSlim _providersLock = new ReaderWriterLockSlim();

public async ValueTask DisposeAsync()
{
using (this._providersLock)
{
await this.Shutdown().ConfigureAwait(false);
}
}

/// <summary>
/// Set the default provider
/// </summary>
Expand Down
8 changes: 0 additions & 8 deletions test/OpenFeature.Tests/OpenFeatureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ namespace OpenFeature.Tests
{
public class OpenFeatureTests : ClearOpenFeatureInstanceFixture
{
static async Task EmptyShutdown()
{
await Task.FromResult(0).ConfigureAwait(false);
}

[Fact]
[Specification("1.1.1", "The `API`, and any state it maintains SHOULD exist as a global singleton, even in cases wherein multiple versions of the `API` are present at runtime.")]
public void OpenFeature_Should_Be_Singleton()
Expand Down Expand Up @@ -79,9 +74,6 @@ public async Task OpenFeature_Should_Shutdown_Unused_Provider()
[Specification("1.6.1", "The API MUST define a mechanism to propagate a shutdown request to active providers.")]
public async Task OpenFeature_Should_Support_Shutdown()
{
// configure the shutdown method of the event executor to do nothing
// to prevent eventing tests from failing
Api.Instance.EventExecutor.SetShutdownDelegate(EmptyShutdown);
var providerA = Substitute.For<FeatureProvider>();
providerA.GetStatus().Returns(ProviderStatus.NotReady);

Expand Down

0 comments on commit 4c5d79f

Please sign in to comment.