Skip to content

Commit

Permalink
SteeltoeOSS#1312 Add in activity creation
Browse files Browse the repository at this point in the history
  • Loading branch information
thompson-tomo committed Jun 8, 2024
1 parent 8ae9892 commit 64b24ae
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Steeltoe.Extensions.Configuration.Placeholder;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class ConfigServerConfigurationProvider : ConfigurationProvider, IConfigu
private static readonly char[] COMMA_DELIMIT = new char[] { ',' };
private static readonly string[] EMPTY_LABELS = new string[] { string.Empty };

private ActivitySource _configActivity = new ActivitySource("Steeltoe.Extensions.Configuration.ConfigServer.ConfigServerConfigurationProvider");

/// <summary>
/// Initializes a new instance of the <see cref="ConfigServerConfigurationProvider"/> class with default
/// configuration settings. <see cref="ConfigServerClientSettings"/>
Expand Down Expand Up @@ -182,13 +185,16 @@ private void OnSettingsChanged()
/// </remarks>
private void DoPolledLoad()
{
try
{
DoLoad();
}
catch (Exception e)
using (var activity = _configActivity.StartActivity(nameof(DoPolledLoad)))
{
_logger.LogWarning(e, "Could not reload configuration during polling");
try
{
DoLoad();
}
catch (Exception e)
{
_logger.LogWarning(e, "Could not reload configuration during polling");
}
}
}

Expand Down Expand Up @@ -309,77 +315,80 @@ internal ConfigEnvironment DoLoad(bool updateDictionary = true)
{
foreach (var label in GetLabels())
{
Task<ConfigEnvironment> task = null;

if (uris.Length > 1)
using (var activity = _configActivity.StartActivity(nameof(DoLoad)))
{
_logger.LogInformation("Multiple Config Server Uris listed.");
Task<ConfigEnvironment> task = null;

// Invoke config servers
task = RemoteLoadAsync(uris, label);
}
else
{
// Single, server make Config Server URI from settings
if (uris.Length > 1)
{
_logger.LogInformation("Multiple Config Server Uris listed.");

// Invoke config servers
task = RemoteLoadAsync(uris, label);
}
else
{
// Single, server make Config Server URI from settings
#pragma warning disable CS0618 // Type or member is obsolete
var path = GetConfigServerUri(label);
var path = GetConfigServerUri(label);

// Invoke config server
task = RemoteLoadAsync(path);
// Invoke config server
task = RemoteLoadAsync(path);
#pragma warning restore CS0618 // Type or member is obsolete
}
}

// Wait for results from server
var env = task.GetAwaiter().GetResult();
// Wait for results from server
var env = task.GetAwaiter().GetResult();

// Update config Data dictionary with any results
if (env != null)
{
_logger.LogInformation(
"Located environment name: {name}, profiles: {profiles}, labels: {label}, version: {version}, state: {state}", env.Name, env.Profiles, env.Label, env.Version, env.State);
if (updateDictionary)
// Update config Data dictionary with any results
if (env != null)
{
var data = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);

if (!string.IsNullOrEmpty(env.State))
_logger.LogInformation(
"Located environment name: {name}, profiles: {profiles}, labels: {label}, version: {version}, state: {state}", env.Name, env.Profiles, env.Label, env.Version, env.State);
if (updateDictionary)
{
data["spring:cloud:config:client:state"] = env.State;
}
var data = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);

if (!string.IsNullOrEmpty(env.Version))
{
data["spring:cloud:config:client:version"] = env.Version;
}
if (!string.IsNullOrEmpty(env.State))
{
data["spring:cloud:config:client:state"] = env.State;
}

var sources = env.PropertySources;
if (sources != null)
{
var index = sources.Count - 1;
for (; index >= 0; index--)
if (!string.IsNullOrEmpty(env.Version))
{
AddPropertySource(sources[index], data);
data["spring:cloud:config:client:version"] = env.Version;
}
}

// Adds client settings (e.g spring:cloud:config:uri, etc) back to the (new) Data dictionary
AddConfigServerClientSettings(data);
var sources = env.PropertySources;
if (sources != null)
{
var index = sources.Count - 1;
for (; index >= 0; index--)
{
AddPropertySource(sources[index], data);
}
}

static bool AreEqual<TKey, TValue>(IDictionary<TKey, TValue> dict1, IDictionary<TKey, TValue> dict2)
{
IEqualityComparer<TValue> valueComparer = EqualityComparer<TValue>.Default;
// Adds client settings (e.g spring:cloud:config:uri, etc) back to the (new) Data dictionary
AddConfigServerClientSettings(data);

return dict1.Count == dict2.Count &&
dict1.Keys.All(key => dict2.ContainsKey(key) && valueComparer.Equals(dict1[key], dict2[key]));
}
static bool AreEqual<TKey, TValue>(IDictionary<TKey, TValue> dict1, IDictionary<TKey, TValue> dict2)
{
IEqualityComparer<TValue> valueComparer = EqualityComparer<TValue>.Default;

if (!AreEqual(Data, data))
{
Data = data;
OnReload();
return dict1.Count == dict2.Count &&
dict1.Keys.All(key => dict2.ContainsKey(key) && valueComparer.Equals(dict1[key], dict2[key]));
}

if (!AreEqual(Data, data))
{
Data = data;
OnReload();
}
}
}

return env;
return env;
}
}
}
}
Expand Down Expand Up @@ -585,65 +594,68 @@ protected internal async Task<ConfigEnvironment> RemoteLoadAsync(string[] reques
Exception error = null;
foreach (var requestUri in requestUris)
{
error = null;

// Get a config server uri and username passwords to use
var trimUri = requestUri.Trim();
var serverUri = _settings.GetRawUri(trimUri);
var username = _settings.GetUserName(trimUri);
var password = _settings.GetPassword(trimUri);
using (var activity = _configActivity.StartActivity(nameof(RemoteLoadAsync)))
{
error = null;

// Make Config Server URI from settings
var path = GetConfigServerUri(serverUri, label);
// Get a config server uri and username passwords to use
var trimUri = requestUri.Trim();
var serverUri = _settings.GetRawUri(trimUri);
var username = _settings.GetUserName(trimUri);
var password = _settings.GetPassword(trimUri);

// Get the request message
var request = GetRequestMessage(path, username, password);
// Make Config Server URI from settings
var path = GetConfigServerUri(serverUri, label);

// If certificate validation is disabled, inject a callback to handle properly
var prevProtocols = (SecurityProtocolType)0;
HttpClientHelper.ConfigureCertificateValidation(_settings.ValidateCertificates, out prevProtocols, out var prevValidator);
// Get the request message
var request = GetRequestMessage(path, username, password);

// Invoke config server
try
{
using var response = await _httpClient.SendAsync(request).ConfigureAwait(false);
// If certificate validation is disabled, inject a callback to handle properly
var prevProtocols = (SecurityProtocolType)0;
HttpClientHelper.ConfigureCertificateValidation(_settings.ValidateCertificates, out prevProtocols, out var prevValidator);

_logger.LogInformation("Config Server returned status: {statusCode} invoking path: {requestUri}", response.StatusCode, WebUtility.UrlEncode(requestUri));
if (response.StatusCode != HttpStatusCode.OK)
// Invoke config server
try
{
if (response.StatusCode == HttpStatusCode.NotFound)
{
return null;
}
using var response = await _httpClient.SendAsync(request).ConfigureAwait(false);

// Throw if status >= 400
if (response.StatusCode >= HttpStatusCode.BadRequest)
_logger.LogInformation("Config Server returned status: {statusCode} invoking path: {requestUri}", response.StatusCode, WebUtility.UrlEncode(requestUri));
if (response.StatusCode != HttpStatusCode.OK)
{
// HttpClientErrorException
throw new HttpRequestException($"Config Server returned status: {response.StatusCode} invoking path: {WebUtility.UrlEncode(requestUri)}");
if (response.StatusCode == HttpStatusCode.NotFound)
{
return null;
}

// Throw if status >= 400
if (response.StatusCode >= HttpStatusCode.BadRequest)
{
// HttpClientErrorException
throw new HttpRequestException($"Config Server returned status: {response.StatusCode} invoking path: {WebUtility.UrlEncode(requestUri)}");
}
else
{
return null;
}
}
else

return await response.Content.ReadFromJsonAsync<ConfigEnvironment>(SerializerOptions).ConfigureAwait(false);
}
catch (Exception e)
{
error = e;
_logger.LogError(e, "Config Server exception, path: {requestUri}", WebUtility.UrlEncode(requestUri));
if (IsContinueExceptionType(e))
{
return null;
continue;
}
}

return await response.Content.ReadFromJsonAsync<ConfigEnvironment>(SerializerOptions).ConfigureAwait(false);
}
catch (Exception e)
{
error = e;
_logger.LogError(e, "Config Server exception, path: {requestUri}", WebUtility.UrlEncode(requestUri));
if (IsContinueExceptionType(e))
throw;
}
finally
{
continue;
HttpClientHelper.RestoreCertificateValidation(_settings.ValidateCertificates, prevProtocols, prevValidator);
}

throw;
}
finally
{
HttpClientHelper.RestoreCertificateValidation(_settings.ValidateCertificates, prevProtocols, prevValidator);
}
}

Expand Down
36 changes: 21 additions & 15 deletions src/Configuration/src/KubernetesBase/KubernetesConfigMapProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,28 @@ internal KubernetesConfigMapProvider(IKubernetes kubernetes, KubernetesConfigSou

public override void Load()
{
try
using (var activity = _kubernetesActivity.StartActivity(nameof(Load)))
{
var configMapResponse = K8sClient.ReadNamespacedConfigMapWithHttpMessagesAsync(Settings.Name, Settings.Namespace).GetAwaiter().GetResult();
ProcessData(configMapResponse.Body);
EnableReloading();
}
catch (HttpOperationException e)
{
if (e.Response.StatusCode == HttpStatusCode.Forbidden)
{
Logger?.LogCritical(e, "Failed to retrieve config map '{configmapName}' in namespace '{configmapNamespace}'. Confirm that your service account has the necessary permissions", Settings.Name, Settings.Namespace);
}
else if (e.Response.StatusCode == HttpStatusCode.NotFound)
try
{
var configMapResponse = K8sClient.ReadNamespacedConfigMapWithHttpMessagesAsync(Settings.Name, Settings.Namespace).GetAwaiter().GetResult();
ProcessData(configMapResponse.Body);
EnableReloading();
return;
}
catch (HttpOperationException e)
{
if (e.Response.StatusCode == HttpStatusCode.Forbidden)
{
Logger?.LogCritical(e, "Failed to retrieve config map '{configmapName}' in namespace '{configmapNamespace}'. Confirm that your service account has the necessary permissions", Settings.Name, Settings.Namespace);
}
else if (e.Response.StatusCode == HttpStatusCode.NotFound)
{
EnableReloading();
return;
}

throw;
throw;
}
}
}

Expand Down Expand Up @@ -89,7 +92,9 @@ private void EnableReloading()
switch (Settings.ReloadSettings.Mode)
{
case ReloadMethods.Event:
ConfigMapWatcher = K8sClient.WatchNamespacedConfigMapAsync(
using (var activity = _kubernetesActivity.StartActivity(nameof(EnableReloading)))
{
ConfigMapWatcher = K8sClient.WatchNamespacedConfigMapAsync(
Settings.Name,
Settings.Namespace,
onEvent: (eventType, item) =>
Expand All @@ -112,6 +117,7 @@ private void EnableReloading()
Logger?.LogCritical(exception, "ConfigMap watcher on {namespace}.{name} encountered an error!", Settings.Namespace, Settings.Name);
},
onClosed: () => { Logger?.LogInformation("ConfigMap watcher on {namespace}.{name} connection has closed", Settings.Namespace, Settings.Name); }).GetAwaiter().GetResult();
}
break;
case ReloadMethods.Polling:
StartPolling(Settings.ReloadSettings.Period);
Expand Down
17 changes: 11 additions & 6 deletions src/Configuration/src/KubernetesBase/KubernetesProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ internal class KubernetesProviderBase : ConfigurationProvider

protected ILogger Logger => Settings.LoggerFactory?.CreateLogger(this.GetType());

private ActivitySource _kubernetesActivity = new ActivitySource("Steeltoe.Extensions.Configuration.Kubernetes.KubernetesProviderBase");

internal KubernetesProviderBase(IKubernetes kubernetes, KubernetesConfigSourceSettings settings, CancellationToken token = default)
{
if (kubernetes is null)
Expand Down Expand Up @@ -58,13 +60,16 @@ protected void StartPolling(int interval)
Polling = true;
while (Polling)
{
Thread.Sleep(TimeSpan.FromSeconds(interval));
Logger?.LogTrace("Interval completed for {namespace}.{name}, beginning reload", Settings.Namespace, Settings.Name);
Load();
if (CancellationToken.IsCancellationRequested)
using (var activity = _kubernetesActivity.StartActivity(nameof(StartPolling)))
{
Logger?.LogTrace("Cancellation requested for {namespace}.{name}, shutting down", Settings.Namespace, Settings.Name);
break;
Thread.Sleep(TimeSpan.FromSeconds(interval));
Logger?.LogTrace("Interval completed for {namespace}.{name}, beginning reload", Settings.Namespace, Settings.Name);
Load();
if (CancellationToken.IsCancellationRequested)
{
Logger?.LogTrace("Cancellation requested for {namespace}.{name}, shutting down", Settings.Namespace, Settings.Name);
break;
}
}
}
},
Expand Down
Loading

0 comments on commit 64b24ae

Please sign in to comment.