Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Luke/event driven script orchestrator #966

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions source/Octopus.Tentacle.Client/AggregateScriptExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Halibut;
using Octopus.Tentacle.Client.EventDriven;
using Octopus.Tentacle.Client.Execution;
using Octopus.Tentacle.Client.Observability;
using Octopus.Tentacle.Client.Scripts;
using Octopus.Tentacle.Client.Scripts.Models;
using Octopus.Tentacle.Client.ServiceHelpers;
using Octopus.Tentacle.Contracts;
using Octopus.Tentacle.Contracts.Logging;
using Octopus.Tentacle.Contracts.Observability;

namespace Octopus.Tentacle.Client
{
/// <summary>
/// Executes scripts, on the best available script service.
/// </summary>
public class AggregateScriptExecutor : IScriptExecutor
{
readonly ITentacleClientTaskLog logger;
readonly ITentacleClientObserver tentacleClientObserver;
readonly TentacleClientOptions clientOptions;
readonly ClientsHolder clientsHolder;
readonly RpcCallExecutor rpcCallExecutor;
readonly TimeSpan onCancellationAbandonCompleteScriptAfter;

public AggregateScriptExecutor(ITentacleClientTaskLog logger,
ITentacleClientObserver tentacleClientObserver,
TentacleClientOptions clientOptions,
IHalibutRuntime halibutRuntime,
ServiceEndPoint serviceEndPoint, TimeSpan onCancellationAbandonCompleteScriptAfter) : this(logger, tentacleClientObserver, clientOptions, halibutRuntime, serviceEndPoint, null, onCancellationAbandonCompleteScriptAfter)
{
}

internal AggregateScriptExecutor(ITentacleClientTaskLog logger,
ITentacleClientObserver tentacleClientObserver,
TentacleClientOptions clientOptions,
IHalibutRuntime halibutRuntime,
ServiceEndPoint serviceEndPoint,
ITentacleServiceDecoratorFactory? tentacleServicesDecoratorFactory,
TimeSpan onCancellationAbandonCompleteScriptAfter)
{
this.logger = logger;
this.tentacleClientObserver = tentacleClientObserver;
this.clientOptions = clientOptions;
this.onCancellationAbandonCompleteScriptAfter = onCancellationAbandonCompleteScriptAfter;
clientsHolder = new ClientsHolder(halibutRuntime, serviceEndPoint, tentacleServicesDecoratorFactory);
rpcCallExecutor = RpcCallExecutorFactory.Create(this.clientOptions.RpcRetrySettings.RetryDuration, this.tentacleClientObserver);
}

public async Task<(ScriptStatus, ICommandContext)> StartScript(ExecuteScriptCommand executeScriptCommand,
StartScriptIsBeingReAttempted startScriptIsBeingReAttempted,
CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();

// Pick what service to use.
var scriptServiceToUse = await DetermineScriptServiceVersionToUse(cancellationToken, operationMetricsBuilder);


var scriptOrchestratorFactory = GetNewScriptOrchestratorFactory(operationMetricsBuilder);

var orchestrator = scriptOrchestratorFactory.CreateScriptExecutor(scriptServiceToUse);
return await orchestrator.StartScript(executeScriptCommand, startScriptIsBeingReAttempted, cancellationToken);
}

async Task<ScriptServiceVersion> DetermineScriptServiceVersionToUse(CancellationToken cancellationToken, ClientOperationMetricsBuilder operationMetricsBuilder)
{
try
{
return await new ScriptServicePicker(clientsHolder.CapabilitiesServiceV2, logger, rpcCallExecutor, clientOptions, operationMetricsBuilder)
.DetermineScriptServiceVersionToUse(cancellationToken);
}
catch (Exception ex) when (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException("Script execution was cancelled", ex);
}
}

public async Task<(ScriptStatus, ICommandContext)> GetStatus(ICommandContext ticketForNextNextStatus, CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();

var scriptOrchestratorFactory = GetNewScriptOrchestratorFactory(operationMetricsBuilder);

var orchestrator = scriptOrchestratorFactory.CreateScriptExecutor(ticketForNextNextStatus.WhichService);

return await orchestrator.GetStatus(ticketForNextNextStatus, cancellationToken);
}

public async Task<(ScriptStatus, ICommandContext)> CancelScript(ICommandContext ticketForNextNextStatus, CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();

var scriptOrchestratorFactory = GetNewScriptOrchestratorFactory(operationMetricsBuilder);

var orchestrator = scriptOrchestratorFactory.CreateScriptExecutor(ticketForNextNextStatus.WhichService);

return await orchestrator.CancelScript(ticketForNextNextStatus, cancellationToken);
}

public Task<ScriptStatus?> Finish(ICommandContext commandContext, CancellationToken scriptExecutionCancellationToken)
{
throw new NotImplementedException();
}

public Task<(ScriptStatus, ICommandContext)> CancelScript(ScriptTicket scriptTicket, CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();
throw new System.NotImplementedException();
}

public async Task<ScriptStatus?> CleanUpScript(ICommandContext ticketForNextNextStatus, CancellationToken cancellationToken)
{
var operationMetricsBuilder = ClientOperationMetricsBuilder.Start();

var scriptOrchestratorFactory = GetNewScriptOrchestratorFactory(operationMetricsBuilder);

var orchestrator = scriptOrchestratorFactory.CreateScriptExecutor(ticketForNextNextStatus.WhichService);

return await orchestrator.CleanUpScript(ticketForNextNextStatus, cancellationToken);
}

ScriptOrchestratorFactory GetNewScriptOrchestratorFactory(ClientOperationMetricsBuilder operationMetricsBuilder)
{
return new ScriptOrchestratorFactory(clientsHolder,
rpcCallExecutor,
operationMetricsBuilder,
onCancellationAbandonCompleteScriptAfter,
clientOptions,
logger);
}

}
}
33 changes: 33 additions & 0 deletions source/Octopus.Tentacle.Client/EventDriven/ICommandContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Octopus.Tentacle.Client.Scripts;
using Octopus.Tentacle.Contracts;

namespace Octopus.Tentacle.Client.EventDriven
{
public interface ICommandContext
{

ScriptTicket ScriptTicket { get; }
long NextLogSequence { get; }
public ScriptServiceVersion WhichService { get; }

// TODO does it actually make sense to all of these properties? Perhaps instead we should just expose this concept of
// serialize the entire thing. That way the driver only needs to know how to save something down for the next time it
// wants to continue script execution.
}

public class DefaultCommandContext : ICommandContext
{
public DefaultCommandContext(ScriptTicket scriptTicket,
long nextLogSequence,
ScriptServiceVersion whichService)
{
ScriptTicket = scriptTicket;
NextLogSequence = nextLogSequence;
WhichService = whichService;
}

public ScriptTicket ScriptTicket { get; }
public long NextLogSequence { get; }
public ScriptServiceVersion WhichService { get; }
}
}
37 changes: 37 additions & 0 deletions source/Octopus.Tentacle.Client/Scripts/IScriptExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Octopus.Tentacle.Client.EventDriven;
using Octopus.Tentacle.Client.Scripts.Models;
using Octopus.Tentacle.Contracts;

namespace Octopus.Tentacle.Client.Scripts
{
public interface IScriptExecutor {
Task<(ScriptStatus, ICommandContext)> StartScript(ExecuteScriptCommand command,
StartScriptIsBeingReAttempted startScriptIsBeingReAttempted,
CancellationToken scriptExecutionCancellationToken);

/// <summary>
/// Returns a status or null when scriptExecutionCancellationToken is null.
/// </summary>
/// <param name="commandContext"></param>
/// <param name="scriptExecutionCancellationToken"></param>
/// <returns></returns>
Task<(ScriptStatus, ICommandContext)> GetStatus(ICommandContext commandContext, CancellationToken scriptExecutionCancellationToken);

Task<(ScriptStatus, ICommandContext)> CancelScript(ICommandContext commandContext, CancellationToken scriptExecutionCancellationToken);

/// <summary>
/// Use this cancel method if only the ScriptTicket is known, e.g. we called StartScript but never got a response.
/// Inheritors
/// </summary>
/// <param name="scriptTicket"></param>
/// <param name="hasStartScriptBeenCalledBefore"></param>
/// <returns></returns>
// Task<(ScriptStatus, ICommandContext)> CancelScript(ScriptTicket scriptTicket, CancellationToken cancellationToken);


Task<ScriptStatus?> CleanUpScript(ICommandContext commandContext, CancellationToken scriptExecutionCancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Octopus.Tentacle.Client.Scripts
{
interface IScriptOrchestrator
public interface IScriptOrchestrator
{
Task<ScriptExecutionResult> ExecuteScript(ExecuteScriptCommand command, CancellationToken scriptExecutionCancellationToken);
}
Expand Down
Loading