Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frank/lobster spike #952

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/Octopus.Tentacle.Client/ITentacleClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ Task<ScriptExecutionResult> ExecuteScript(
OnScriptCompleted onScriptCompleted,
ITentacleClientTaskLog logger,
CancellationToken scriptExecutionCancellationToken);

Task UpdateResources(string[] resources, ITentacleClientTaskLog logger, CancellationToken cancellationToken);
}
}
41 changes: 40 additions & 1 deletion source/Octopus.Tentacle.Client/TentacleClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Octopus.Tentacle.Contracts.ClientServices;
using Octopus.Tentacle.Contracts.KubernetesScriptServiceV1;
using Octopus.Tentacle.Contracts.KubernetesScriptServiceV1Alpha;
using Octopus.Tentacle.Contracts.LiveObjectStatusServiceV1;
using Octopus.Tentacle.Contracts.Logging;
using Octopus.Tentacle.Contracts.Observability;
using Octopus.Tentacle.Contracts.ScriptServiceV2;
Expand All @@ -28,6 +29,7 @@ public class TentacleClient : ITentacleClient

readonly IAsyncClientScriptService scriptServiceV1;
readonly IAsyncClientScriptServiceV2 scriptServiceV2;
readonly IAsyncClientLiveObjectStatusServiceV1 liveObjectStatusServiceV1;
readonly IAsyncClientKubernetesScriptServiceV1Alpha kubernetesScriptServiceV1Alpha;
readonly IAsyncClientKubernetesScriptServiceV1 kubernetesScriptServiceV1;
readonly IAsyncClientFileTransferService clientFileTransferServiceV1;
Expand Down Expand Up @@ -85,7 +87,8 @@ internal TentacleClient(
kubernetesScriptServiceV1 = halibutRuntime.CreateAsyncClient<IKubernetesScriptServiceV1, IAsyncClientKubernetesScriptServiceV1>(serviceEndPoint);
clientFileTransferServiceV1 = halibutRuntime.CreateAsyncClient<IFileTransferService, IAsyncClientFileTransferService>(serviceEndPoint);
capabilitiesServiceV2 = halibutRuntime.CreateAsyncClient<ICapabilitiesServiceV2, IAsyncClientCapabilitiesServiceV2>(serviceEndPoint).WithBackwardsCompatability();

liveObjectStatusServiceV1 = halibutRuntime.CreateAsyncClient<ILiveObjectStatusServiceV1, IAsyncClientLiveObjectStatusServiceV1>(serviceEndPoint);

if (tentacleServicesDecoratorFactory != null)
{
scriptServiceV1 = tentacleServicesDecoratorFactory.Decorate(scriptServiceV1);
Expand All @@ -94,13 +97,49 @@ internal TentacleClient(
kubernetesScriptServiceV1 = tentacleServicesDecoratorFactory.Decorate(kubernetesScriptServiceV1);
clientFileTransferServiceV1 = tentacleServicesDecoratorFactory.Decorate(clientFileTransferServiceV1);
capabilitiesServiceV2 = tentacleServicesDecoratorFactory.Decorate(capabilitiesServiceV2);
//liveObjectStatusServiceV1 = tentacleServicesDecoratorFactory.Decorate(liveObjectStatusServiceV1)
}

rpcCallExecutor = RpcCallExecutorFactory.Create(this.clientOptions.RpcRetrySettings.RetryDuration, this.tentacleClientObserver);
}

public TimeSpan OnCancellationAbandonCompleteScriptAfter { get; set; } = TimeSpan.FromMinutes(1);

public async Task UpdateResources(string[] resources, ITentacleClientTaskLog logger, CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();

async Task<bool> UpdateResources(CancellationToken ct)
{
logger.Info($"Beginning update resources to Tentacle");
await liveObjectStatusServiceV1.UpdateResources(resources, new HalibutProxyRequestOptions(ct));
logger.Info("Upload complete");

return true;
}

try
{
await rpcCallExecutor.Execute(
retriesEnabled: clientOptions.RpcRetrySettings.RetriesEnabled,
RpcCall.Create<ILiveObjectStatusServiceV1>(nameof(ILiveObjectStatusServiceV1.UpdateResources)),
UpdateResources,
logger,
operationMetricsBuilder,
cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
operationMetricsBuilder.Failure(e, cancellationToken);
throw;
}
finally
{
var operationMetrics = operationMetricsBuilder.Build();
tentacleClientObserver.UploadFileCompleted(operationMetrics, logger);
}
}

public async Task<UploadResult> UploadFile(string fileName, string path, DataStream package, ITentacleClientTaskLog logger, CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Threading.Tasks;
using Halibut.ServiceModel;

namespace Octopus.Tentacle.Contracts.ClientServices
{
public interface IAsyncClientLiveObjectStatusServiceV1
{
Task UpdateResources(string[] resources, HalibutProxyRequestOptions proxyRequestOptions);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Octopus.Tentacle.Contracts.LiveObjectStatusServiceV1
{
public interface ILiveObjectStatusServiceV1
{
void UpdateResources(string[] resources);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Halibut;
using Halibut.Diagnostics;
using Octopus.Client.Model;
using Octopus.Client.Model.Endpoints;
using Octopus.Diagnostics;
using Octopus.Tentacle.Configuration;

namespace Octopus.Tentacle.Communications
{
public class HalibutEndpointDiscovery
{
readonly IWritableTentacleConfiguration configuration;
readonly IProxyConfigParser proxyConfigParser;
readonly ISystemLog log;

public HalibutEndpointDiscovery(IWritableTentacleConfiguration configuration, IProxyConfigParser proxyConfigParser, ISystemLog log)
{
this.configuration = configuration;
this.proxyConfigParser = proxyConfigParser;
this.log = log;
}

public IEnumerable<ServiceEndPoint> GetPollingEndpoints()
{
foreach (var pollingEndPoint in GetOctopusServersToPoll())
{
if (pollingEndPoint.Address == null)
{
log.WarnFormat("Configured to connect to server {0}, but its configuration is incomplete; skipping.", pollingEndPoint);
continue;
}

#pragma warning disable 618
pollingEndPoint.SubscriptionId ??= "poll://" + configuration.TentacleSquid?.ToLowerInvariant() + "/";
#pragma warning restore 618

log.Info($"Agent will poll Octopus Server at {pollingEndPoint.Address} for subscription {pollingEndPoint.SubscriptionId} expecting thumbprint {pollingEndPoint.Thumbprint}");
var halibutProxy = proxyConfigParser.ParseToHalibutProxy(configuration.PollingProxyConfiguration, pollingEndPoint.Address, log);

var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimits();
var serviceEndPoint = new ServiceEndPoint(pollingEndPoint.Address, pollingEndPoint.Thumbprint, halibutProxy, halibutTimeoutsAndLimits);

yield return serviceEndPoint;
}
}

IEnumerable<OctopusServerConfiguration> GetOctopusServersToPoll()
{
return configuration.TrustedOctopusServers.Where(octopusServerConfiguration =>
octopusServerConfiguration.CommunicationStyle == CommunicationStyle.TentacleActive ||
(octopusServerConfiguration is { CommunicationStyle: CommunicationStyle.KubernetesTentacle } &&
octopusServerConfiguration.KubernetesTentacleCommunicationMode == TentacleCommunicationModeResource.Polling));
}
}
}
29 changes: 26 additions & 3 deletions source/Octopus.Tentacle/Communications/HalibutInitializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Halibut;
using Halibut.Diagnostics;
using Octopus.Client.Model;
Expand Down Expand Up @@ -35,8 +36,8 @@ public void Start()

TrustOctopusServers();

AddPollingEndpoints();

var endpoints = AddPollingEndpoints().ToList();
if (configuration.NoListen)
{
log.Info("Agent will not listen on any TCP ports");
Expand Down Expand Up @@ -85,7 +86,7 @@ void TrustOctopusServers()



void AddPollingEndpoints()
IEnumerable<ServiceEndPoint> AddPollingEndpoints()
{
foreach (var pollingEndPoint in GetOctopusServersToPoll())
{
Expand All @@ -110,6 +111,8 @@ void AddPollingEndpoints()
for (var i = 0; i < connectionCount; i++)
{
halibut.Poll(new Uri(pollingEndPoint.SubscriptionId), serviceEndPoint, CancellationToken.None);
yield return serviceEndPoint;
//yield return new ServiceEndPoint(new Uri(pollingEndPoint.SubscriptionId), pollingEndPoint.Thumbprint, halibutTimeoutsAndLimits);
}
}
}
Expand Down Expand Up @@ -174,4 +177,24 @@ public void Stop()
{
}
}






public interface IMyEchoService
{
string SayHello(string name);
}

public interface IAsyncClientMyEchoService
{
Task<string> SayHelloAsync(string name);
}

public interface IAsyncMyEchoService
{
Task<string> SayHelloAsync(string name, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Halibut;
using k8s;
using k8s.Models;
using Octopus.Diagnostics;
using Octopus.Tentacle.Communications;
using Octopus.Tentacle.Time;
using Octopus.Tentacle.Util;
using Polly;

namespace Octopus.Tentacle.Kubernetes
{
public class KubernetesLiveObjectStatusService : KubernetesService
{
readonly ISystemLog log;
readonly HalibutRuntime halibut;
readonly HalibutEndpointDiscovery endpointDiscovery;

public KubernetesLiveObjectStatusService(IKubernetesClientConfigProvider configProvider, ISystemLog log, HalibutRuntime halibut, HalibutEndpointDiscovery endpointDiscovery)
: base(configProvider, log)
{
this.log = log;
this.halibut = halibut;
this.endpointDiscovery = endpointDiscovery;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
const int maxDurationSeconds = 70;

// We don't want the monitoring to ever stop
var policy = Policy.Handle<Exception>().WaitAndRetryForeverAsync(
retry => TimeSpan.FromSeconds(ExponentialBackoff.GetDuration(retry, maxDurationSeconds)),
(ex, duration) =>
{
log.Error(ex, "An unexpected error occured while monitoring Pods, waiting for: " + duration);
});

await policy.ExecuteAsync(async ct => await UpdateLoop(ct), cancellationToken);
}

async Task UpdateLoop(CancellationToken cancellationToken)
{
var c = halibut.CreateAsyncClient<IMyEchoService, IAsyncClientMyEchoService>(endpointDiscovery.GetPollingEndpoints().First());

while (!cancellationToken.IsCancellationRequested)
{
foreach (var @namespace in LobsterResources.NamespacesToMonitor())
{
var pods = await Client.ListNamespacedPodAsync(@namespace, cancellationToken: cancellationToken);

foreach (var pod in pods)
{
var response = await c.SayHelloAsync($"Pod {pod.Namespace()}:{pod.Name()} - {pod.Status.Phase}");
log.Info("Message Back From Server:" + response);
}
}


await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
}
}
}

public static class LobsterResources
{
static readonly HashSet<string> namespaces = new HashSet<string>() { "octopus-agent-agentlobs" };
public static string[] NamespacesToMonitor()
{
lock(namespaces)
return namespaces.ToArray();
}

public static void UpdateNamespaces(string[] namespaces)
{
lock(namespaces)
namespaces.AddRange(namespaces);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Octopus.Diagnostics;
using Octopus.Tentacle.Background;

namespace Octopus.Tentacle.Kubernetes
{
public class KubernetesLiveObjectStatusTask : BackgroundTask
{
readonly KubernetesLiveObjectStatusService podMonitor;

public KubernetesLiveObjectStatusTask(KubernetesLiveObjectStatusService podMonitor, ISystemLog log) : base(log, TimeSpan.FromSeconds(30))
{
this.podMonitor = podMonitor;
}

protected override async Task RunTask(CancellationToken cancellationToken)
{
await podMonitor.StartAsync(cancellationToken);
}
}
}
4 changes: 4 additions & 0 deletions source/Octopus.Tentacle/Kubernetes/KubernetesModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
using Octopus.Tentacle.Background;
using Octopus.Tentacle.Communications;

namespace Octopus.Tentacle.Kubernetes
{
public class KubernetesModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<HalibutEndpointDiscovery>().SingleInstance();
builder.RegisterType<KubernetesLiveObjectStatusService>().SingleInstance();
builder.RegisterType<KubernetesPodService>().As<IKubernetesPodService>().SingleInstance();
builder.RegisterType<KubernetesClusterService>().As<IKubernetesClusterService>().SingleInstance();
builder.RegisterType<KubernetesPodContainerResolver>().As<IKubernetesPodContainerResolver>().SingleInstance();
Expand All @@ -20,6 +23,7 @@ protected override void Load(ContainerBuilder builder)
builder.RegisterType<ScriptPodSinceTimeStore>().As<IScriptPodSinceTimeStore>().SingleInstance();
builder.RegisterType<TentacleScriptLogProvider>().As<ITentacleScriptLogProvider>().SingleInstance();

builder.RegisterType<KubernetesLiveObjectStatusTask>().As<IBackgroundTask>().SingleInstance();
builder.RegisterType<KubernetesPodMonitorTask>().As<IKubernetesPodMonitorTask>().As<IBackgroundTask>().SingleInstance();
builder.RegisterType<KubernetesPodMonitor>().As<IKubernetesPodMonitor>().As<IKubernetesPodStatusProvider>().SingleInstance();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using Octopus.Diagnostics;
using Octopus.Tentacle.Contracts.LiveObjectStatusServiceV1;
using Octopus.Tentacle.Kubernetes;

namespace Octopus.Tentacle.Services.Scripts.Kubernetes
{
[KubernetesService(typeof(ILiveObjectStatusServiceV1))]
public class LiveObjectStatusServiceV1 : ILiveObjectStatusServiceV1
{
readonly ISystemLog log;

public LiveObjectStatusServiceV1(
ISystemLog log)
{
this.log = log;
}

public void UpdateResources(string[] resources)
{
log.Info("Resources: " + string.Join(", ", resources));

LobsterResources.UpdateNamespaces(resources);
}
}
}