Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added events mechanism for usage in gator #267

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Gigya.Microdot.Fakes/Discovery/AlwaysLocalHostDiscovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,15 @@ public async Task<Node[]> GetNodes(DeploymentIdentifier deploymentIdentifier)
public void Dispose()
{
}

public TaskCompletionSource<(string version, Node[] nodes)> WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier)
{
throw new NotImplementedException();
}

Task<(string version, Node[] nodes)> IDiscovery.WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier)
{
throw new NotImplementedException();
}
}
}
5 changes: 5 additions & 0 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/ConfigNodeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,10 @@ public void Dispose()
{
// nothing to shutdown
}

public void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, System.Threading.Tasks.TaskCompletionSource<(string version, Node[] nodes)> tcs)
{
throw new NotImplementedException();
}
}
}
5 changes: 3 additions & 2 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ private async Task<ConsulResponse<T>> Call<T>(string commandPath, CancellationTo
using (var timeoutcancellationToken = new CancellationTokenSource(httpTaskTimeout))
using (var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutcancellationToken.Token))
{
var response = await _httpClient.GetAsync(commandPath, HttpCompletionOption.ResponseContentRead, cancellationSource.Token).ConfigureAwait(false);
using (response)
var response = await _httpClient.GetAsync(commandPath, HttpCompletionOption.ResponseContentRead, cancellationSource.Token).ConfigureAwait(false);

using (response)
{
responseContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
consulResult.StatusCode = response.StatusCode;
Expand Down
44 changes: 39 additions & 5 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulNodeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -55,16 +57,19 @@ internal class ConsulNodeSource : INodeSource

private HealthMessage _healthStatus = new HealthMessage(Health.Info, message: null, suppressMessage: true);
private readonly IDisposable _healthCheck;
private ConcurrentDictionary<TaskCompletionSource<(string version, Node[] nodes)>, Boolean> _taskCompletionSourcesList { get; }

public ConsulNodeSource(

public ConsulNodeSource(
DeploymentIdentifier deploymentIdentifier,
ILog log,
ConsulClient consulClient,
IDateTime dateTime,
Func<ConsulConfig> getConfig,
Func<string, AggregatingHealthStatus> getAggregatingHealthStatus)
{
DeploymentIdentifier = deploymentIdentifier;
_taskCompletionSourcesList = new ConcurrentDictionary<TaskCompletionSource<(string version, Node[] nodes)>, Boolean>();
DeploymentIdentifier = deploymentIdentifier;
Log = log;
ConsulClient = consulClient;
DateTime = dateTime;
Expand All @@ -89,7 +94,10 @@ public Node[] GetNodes()
else return nodes.Nodes;
}


public void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, TaskCompletionSource<(string version, Node[] nodes)> tcs)
{
_taskCompletionSourcesList.TryAdd(tcs, false);
}


private async Task UpdateLoop()
Expand All @@ -111,8 +119,34 @@ private async Task UpdateLoop()
if (deploymentVersionTask.IsCompleted)
{
deploymentVersion = deploymentVersionTask.Result;
if (deploymentVersion.Error == null && deploymentVersion.IsUndeployed == false)
lastKnownDeploymentVersion = deploymentVersion.ResponseObject;
if (deploymentVersion.Error == null && deploymentVersion.IsUndeployed == false)
{
if (lastKnownDeploymentVersion != deploymentVersion.ResponseObject)
{
List<TaskCompletionSource<(string version, Node[] nodes)>> toRemoveList = new List<TaskCompletionSource<(string version, Node[] nodes)>>();

foreach (KeyValuePair<TaskCompletionSource<(string version, Node[] nodes)>, Boolean> entry in _taskCompletionSourcesList)
{
if (entry.Value == false)
{
entry.Key.SetResult((deploymentVersion.ResponseObject, null));

_taskCompletionSourcesList.TryUpdate(entry.Key, false, true);

toRemoveList.Add(entry.Key);
}
}

foreach (TaskCompletionSource<(string version, Node[] nodes)> key in toRemoveList)
{
var val = new Boolean();
_taskCompletionSourcesList.TryRemove(key, out val);
}
}

lastKnownDeploymentVersion = deploymentVersion.ResponseObject;
}

deploymentVersionTask = ConsulClient.GetDeploymentVersion(DeploymentIdentifier, deploymentVersion.ModifyIndex ?? InitialModifyIndex, _shutdownToken.Token);
}

Expand Down
38 changes: 35 additions & 3 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/Discovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ internal sealed class Discovery : IDiscovery
private Func<DeploymentIdentifier, ConfigNodeSource> CreateConfigNodeSource { get; }
private Dictionary<string, INodeSourceFactory> NodeSourceFactories { get; }

class NodeSourceAndAccesstime

class NodeSourceAndAccesstime
{
public string NodeSourceType;
public Task<INodeSource> NodeSourceTask;
Expand Down Expand Up @@ -77,15 +78,13 @@ public Discovery(Func<DiscoveryConfig> getConfig,
}



/// <inheritdoc />
public ILoadBalancer CreateLoadBalancer(DeploymentIdentifier deploymentIdentifier, ReachabilityCheck reachabilityCheck, TrafficRoutingStrategy trafficRoutingStrategy)
{
return _createLoadBalancer(deploymentIdentifier, reachabilityCheck, trafficRoutingStrategy);
}



/// <inheritdoc />
public async Task<Node[]> GetNodes(DeploymentIdentifier deploymentIdentifier)
{
Expand Down Expand Up @@ -115,7 +114,40 @@ public async Task<Node[]> GetNodes(DeploymentIdentifier deploymentIdentifier)
else return null;
}

public async Task<(string version, Node[] nodes)> WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier)
{
TaskCompletionSource<(string version, Node[] nodes)> tcs = new TaskCompletionSource<(string version, Node[] nodes)>();

// We have a cached node source; query it
if (_nodeSources.TryGetValue(deploymentIdentifier, out Lazy<NodeSourceAndAccesstime> lazySource))
{
lazySource.Value.LastAccessTime = DateTime.UtcNow;
var nodeSource = await lazySource.Value.NodeSourceTask.ConfigureAwait(false);

nodeSource.RegisterForSchemaChangeEvent(deploymentIdentifier, tcs);
}

// No node source but the service is deployed; create one and query it
else if (await IsServiceDeployed(deploymentIdentifier).ConfigureAwait(false))
{
string sourceType = GetConfiguredSourceType(deploymentIdentifier);
lazySource = _nodeSources.GetOrAdd(deploymentIdentifier, _ => new Lazy<NodeSourceAndAccesstime>(() =>
new NodeSourceAndAccesstime
{
NodeSourceType = sourceType,
LastAccessTime = DateTime.UtcNow,
NodeSourceTask = CreateNodeSource(sourceType, deploymentIdentifier)
}));
var nodeSource = await lazySource.Value.NodeSourceTask.ConfigureAwait(false);

nodeSource.RegisterForSchemaChangeEvent(deploymentIdentifier, tcs);
}

// No node source and the service is not deployed; return null
else return (null, null);

return await tcs.Task;
}

private async Task<bool> IsServiceDeployed(DeploymentIdentifier deploymentIdentifier)
{
Expand Down
2 changes: 2 additions & 0 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/IDiscovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ public interface IDiscovery: IDisposable
/// <param name="deploymentIdentifier">identifier for service and env for which LoadBalancer is requested</param>
/// <returns>a list of <see cref="Node"/>, or null if the service is not deployed in the requested environment</returns>
Task<Node[]> GetNodes(DeploymentIdentifier deploymentIdentifier);

Task<(string version, Node[] nodes)> WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier);
}
}
3 changes: 3 additions & 0 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/INodeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#endregion

using System;
using System.Threading.Tasks;
using Gigya.Common.Contracts.Exceptions;
using Gigya.Microdot.SharedLogic.Rewrite;

Expand All @@ -37,5 +38,7 @@ public interface INodeSource: IDisposable
/// <returns>A non-empty array of nodes.</returns>
/// <exception cref="EnvironmentException">Thrown when no nodes are available, the service was undeployed or an error occurred.</exception>
Node[] GetNodes();

void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, TaskCompletionSource<(string version, Node[] nodes)> tcs);
}
}
5 changes: 5 additions & 0 deletions Gigya.Microdot.ServiceDiscovery/Rewrite/LocalNodeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public void Dispose()
{
// nothing to shutdown
}

public void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, System.Threading.Tasks.TaskCompletionSource<(string version, Node[] nodes)> tcs)
{
throw new System.NotImplementedException();
}
}

}