diff --git a/cmd/harness/main.go b/cmd/harness/main.go index 9245502b..75ef0e09 100644 --- a/cmd/harness/main.go +++ b/cmd/harness/main.go @@ -23,12 +23,13 @@ func main() { ) ctx := signals.NewCancelableContext( signals.SetupSignalHandler(signals.ExitCodeTerminated), - signals.NewTimeoutSignal(args.Timeout()), + //signals.NewTimeoutSignal(args.Timeout()), signals.NewConsoleSignal(consoleClient, args.StackRunID()), ) ctrl, err := controller.NewStackRunController( controller.WithStackRun(args.StackRunID()), + controller.WithStackRunStepTimeout(args.Timeout()), controller.WithConsoleClient(consoleClient), controller.WithFetchClient(fetchClient), controller.WithWorkingDir(args.WorkingDir()), diff --git a/pkg/controller/stacks/reconciler.go b/pkg/controller/stacks/reconciler.go index f03605b7..ccc97b94 100644 --- a/pkg/controller/stacks/reconciler.go +++ b/pkg/controller/stacks/reconciler.go @@ -42,7 +42,7 @@ func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client } func (r *StackReconciler) GetPublisher() (string, websocket.Publisher) { - return "stack.event", &socketPublisher{ + return "stack.run.event", &socketPublisher{ stackRunQueue: r.StackQueue, stackRunCache: r.StackCache, } diff --git a/pkg/harness/controller/controller.go b/pkg/harness/controller/controller.go index 0e323b2d..5229a287 100644 --- a/pkg/harness/controller/controller.go +++ b/pkg/harness/controller/controller.go @@ -3,7 +3,6 @@ package controller import ( "cmp" "context" - "errors" "fmt" "slices" "sync" @@ -13,9 +12,9 @@ import ( "k8s.io/klog/v2" "github.com/pluralsh/deployment-operator/pkg/harness/environment" - internalerrors "github.com/pluralsh/deployment-operator/pkg/harness/errors" "github.com/pluralsh/deployment-operator/pkg/harness/exec" "github.com/pluralsh/deployment-operator/pkg/harness/sink" + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" "github.com/pluralsh/deployment-operator/pkg/harness/tool" "github.com/pluralsh/deployment-operator/pkg/log" ) @@ -64,7 +63,7 @@ func (in *stackRunController) Start(ctx context.Context) (retErr error) { // In case of any error finish the execution and return error. case err := <-in.errChan: retErr = err - // If execution finished successfully return. + // If execution finished successfully return without error. case <-in.finishedChan: retErr = nil } @@ -79,57 +78,6 @@ func (in *stackRunController) Start(ctx context.Context) (retErr error) { return in.postStart(retErr) } -func (in *stackRunController) preStart() { - if in.stackRun.Status != gqlclient.StackStatusPending && !environment.IsDev() { - klog.Fatalf("could not start stack run: invalid status: %s", in.stackRun.Status) - } - - if err := in.markStackRun(gqlclient.StackStatusRunning); err != nil { - klog.ErrorS(err, "could not update stack run status") - } -} - -func (in *stackRunController) postStart(err error) error { - var status gqlclient.StackStatus - - switch { - case err == nil: - status = gqlclient.StackStatusSuccessful - case errors.Is(err, internalerrors.ErrRemoteCancel): - status = gqlclient.StackStatusCancelled - default: - status = gqlclient.StackStatusFailed - } - - if err := in.completeStackRun(status, err); err != nil { - klog.ErrorS(err, "could not complete stack run") - } - - klog.V(log.LogLevelInfo).InfoS("stack run completed") - return err -} - -func (in *stackRunController) postStepRun(id string, err error) { - var status gqlclient.StepStatus - - switch { - case err == nil: - status = gqlclient.StepStatusSuccessful - default: - status = gqlclient.StepStatusFailed - } - - if err := in.markStackRunStep(id, status); err != nil { - klog.ErrorS(err, "could not update stack run step status") - } -} - -func (in *stackRunController) preStepRun(id string) { - if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil { - klog.ErrorS(err, "could not update stack run status") - } -} - func (in *stackRunController) executables(ctx context.Context) []exec.Executable { // Ensure that steps are sorted in the correct order slices.SortFunc(in.stackRun.Steps, func(s1, s2 *gqlclient.RunStepFragment) int { @@ -162,10 +110,10 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient. )..., ) - argsModifier := in.tool.Modifier(step.Stage) + modifier := in.tool.Modifier(step.Stage) args := step.Args - if argsModifier != nil { - args = argsModifier.Args(args) + if modifier != nil { + args = modifier.Args(args) } return exec.NewExecutable( @@ -175,6 +123,8 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient. exec.WithArgs(args), exec.WithID(step.ID), exec.WithLogSink(consoleWriter), + exec.WithHook(stackrun.LifecyclePreStart, in.preExecHook(step.Stage, step.ID)), + exec.WithHook(stackrun.LifecyclePostStart, in.postExecHook(step.Stage, step.ID)), ) } @@ -265,7 +215,7 @@ func NewStackRunController(options ...Option) (Controller, error) { ctrl.executor = newExecutor( errChan, finishedChan, - WithPreRunFunc(ctrl.preStepRun), + //WithPreRunFunc(ctrl.preStepRun), WithPostRunFunc(ctrl.postStepRun), ) diff --git a/pkg/harness/controller/controller_hooks.go b/pkg/harness/controller/controller_hooks.go new file mode 100644 index 00000000..982e81e2 --- /dev/null +++ b/pkg/harness/controller/controller_hooks.go @@ -0,0 +1,129 @@ +package controller + +import ( + "context" + "errors" + "time" + + gqlclient "github.com/pluralsh/console-client-go" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + "github.com/pluralsh/deployment-operator/pkg/harness/environment" + internalerrors "github.com/pluralsh/deployment-operator/pkg/harness/errors" + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" + "github.com/pluralsh/deployment-operator/pkg/log" +) + +var ( + runApproved = false +) + +func (in *stackRunController) preStart() { + if in.stackRun.Status != gqlclient.StackStatusPending && !environment.IsDev() { + klog.Fatalf("could not start stack run: invalid status: %s", in.stackRun.Status) + } + + if err := in.markStackRun(gqlclient.StackStatusRunning); err != nil { + klog.ErrorS(err, "could not update stack run status") + } +} + +func (in *stackRunController) postStart(err error) error { + var status gqlclient.StackStatus + + switch { + case err == nil: + status = gqlclient.StackStatusSuccessful + case errors.Is(err, internalerrors.ErrRemoteCancel): + status = gqlclient.StackStatusCancelled + default: + status = gqlclient.StackStatusFailed + } + + if err := in.completeStackRun(status, err); err != nil { + klog.ErrorS(err, "could not complete stack run") + } + + klog.V(log.LogLevelInfo).InfoS("stack run completed") + return err +} + +func (in *stackRunController) postStepRun(id string, err error) { + var status gqlclient.StepStatus + + switch { + case err == nil: + status = gqlclient.StepStatusSuccessful + default: + status = gqlclient.StepStatusFailed + } + + if err := in.markStackRunStep(id, status); err != nil { + klog.ErrorS(err, "could not update stack run step status") + } +} + +func (in *stackRunController) preStepRun(id string) { + if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil { + klog.ErrorS(err, "could not update stack run status") + } +} + +func (in *stackRunController) postExecHook(stage gqlclient.StepStage, id string) stackrun.HookFunction { + return func() error { + if stage != gqlclient.StepStagePlan { + return nil + } + + return in.uploadPlan() + } +} + +func (in *stackRunController) preExecHook(stage gqlclient.StepStage, id string) stackrun.HookFunction { + return func() error { + if stage == gqlclient.StepStageApply { + if err := in.approvalCheck(); err != nil { + return err + } + } + + if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil { + klog.ErrorS(err, "could not update stack run status") + } + return nil + } +} + +func (in *stackRunController) approvalCheck() error { + if !in.stackRun.Approval || runApproved { + return nil + } + + return wait.PollUntilContextCancel(context.Background(), 5*time.Second, true, func(_ context.Context) (done bool, err error) { + if runApproved { + return true, nil + } + + stack, err := in.consoleClient.GetStackRun(in.stackRunID) + if err != nil { + klog.ErrorS(err, "could not check stack run approval") + return false, nil + } + + runApproved = stack.ApprovedAt != nil + return runApproved, nil + }) +} + +func (in *stackRunController) uploadPlan() error { + state, err := in.tool.Plan() + if err != nil { + klog.ErrorS(err, "could not prepare plan") + } + + return in.consoleClient.UpdateStackRun(in.stackRunID, gqlclient.StackRunAttributes{ + State: state, + Status: gqlclient.StackStatusRunning, + }) +} diff --git a/pkg/harness/controller/controller_options.go b/pkg/harness/controller/controller_options.go index 33cc1c90..ac5efc05 100644 --- a/pkg/harness/controller/controller_options.go +++ b/pkg/harness/controller/controller_options.go @@ -1,6 +1,8 @@ package controller import ( + "time" + "github.com/pluralsh/deployment-operator/internal/helpers" console "github.com/pluralsh/deployment-operator/pkg/client" "github.com/pluralsh/deployment-operator/pkg/harness/sink" @@ -35,3 +37,9 @@ func WithSinkOptions(options ...sink.Option) Option { s.sinkOptions = options } } + +func WithStackRunStepTimeout(timeout time.Duration) Option { + return func(s *stackRunController) { + s.stackRunStepTimeout = timeout + } +} diff --git a/pkg/harness/controller/controller_types.go b/pkg/harness/controller/controller_types.go index b5ea4cca..2e58d58d 100644 --- a/pkg/harness/controller/controller_types.go +++ b/pkg/harness/controller/controller_types.go @@ -3,6 +3,7 @@ package controller import ( "context" "sync" + "time" "github.com/pluralsh/deployment-operator/internal/helpers" console "github.com/pluralsh/deployment-operator/pkg/client" @@ -30,6 +31,9 @@ type stackRunController struct { // stackRunID stackRunID string + // stackRunStepTimeout + stackRunStepTimeout time.Duration + // stackRun stackRun *stackrun.StackRun diff --git a/pkg/harness/controller/executor.go b/pkg/harness/controller/executor.go index 458e4687..c5c9cf36 100644 --- a/pkg/harness/controller/executor.go +++ b/pkg/harness/controller/executor.go @@ -8,6 +8,7 @@ import ( "k8s.io/klog/v2" "github.com/pluralsh/deployment-operator/pkg/harness/exec" + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" "github.com/pluralsh/deployment-operator/pkg/log" ) @@ -144,12 +145,21 @@ func (in *executor) dequeue(executable exec.Executable) (empty bool) { return len(in.startQueue) == 0 } +func (in *executor) runLifecycleFunction(lifecycle stackrun.Lifecycle) error { + if fn, exists := in.hookFunctions[lifecycle]; exists { + return fn() + } + + return nil +} + func newExecutor(errChan chan error, finishedChan chan struct{}, options ...ExecutorOption) *executor { result := &executor{ - errChan: errChan, - finishedChan: finishedChan, - strategy: ExecutionStrategyOrdered, - ch: make(chan exec.Executable), + errChan: errChan, + finishedChan: finishedChan, + strategy: ExecutionStrategyOrdered, + ch: make(chan exec.Executable), + hookFunctions: make(map[stackrun.Lifecycle]stackrun.HookFunction), } for _, option := range options { diff --git a/pkg/harness/controller/executor_options.go b/pkg/harness/controller/executor_options.go index 668350ca..2e52d429 100644 --- a/pkg/harness/controller/executor_options.go +++ b/pkg/harness/controller/executor_options.go @@ -1,5 +1,9 @@ package controller +import ( + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" +) + func WithExecutionStrategy(strategy ExecutionStrategy) ExecutorOption { return func(e *executor) { e.strategy = strategy @@ -17,3 +21,9 @@ func WithPreRunFunc(fn func(string)) ExecutorOption { e.preRunFunc = fn } } + +func WithHook(lifecycle stackrun.Lifecycle, fn stackrun.HookFunction) ExecutorOption { + return func(e *executor) { + e.hookFunctions[lifecycle] = fn + } +} diff --git a/pkg/harness/controller/executor_types.go b/pkg/harness/controller/executor_types.go index b4afbfbc..8ff983d4 100644 --- a/pkg/harness/controller/executor_types.go +++ b/pkg/harness/controller/executor_types.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/pluralsh/deployment-operator/pkg/harness/exec" + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" ) type executor struct { @@ -41,6 +42,9 @@ type executor struct { // It is used only by the ExecutionStrategyOrdered to ensure ordered // run of the executables. ch chan exec.Executable + + // hookFunctions ... + hookFunctions map[stackrun.Lifecycle]stackrun.HookFunction } type ExecutorOption func(*executor) diff --git a/pkg/harness/exec/exec.go b/pkg/harness/exec/exec.go index ff7e1f96..9155ee17 100644 --- a/pkg/harness/exec/exec.go +++ b/pkg/harness/exec/exec.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" "github.com/pluralsh/deployment-operator/pkg/log" ) @@ -37,8 +38,16 @@ func (in *executable) Run(ctx context.Context) error { cmd.Dir = in.workingDirectory } + if err := in.runLifecycleFunction(stackrun.LifecyclePreStart); err != nil { + return err + } + klog.V(log.LogLevelExtended).InfoS("executing", "command", in.Command()) - return cmd.Run() + if err := cmd.Run(); err != nil { + return err + } + + return in.runLifecycleFunction(stackrun.LifecyclePostStart) } func (in *executable) RunWithOutput(ctx context.Context) ([]byte, error) { @@ -87,11 +96,20 @@ func (in *executable) close(w io.WriteCloser) { } } +func (in *executable) runLifecycleFunction(lifecycle stackrun.Lifecycle) error { + if fn, exists := in.hookFunctions[lifecycle]; exists { + return fn() + } + + return nil +} + func NewExecutable(command string, options ...Option) Executable { result := &executable{ - command: command, - args: make([]string, 0), - env: make([]string, 0), + command: command, + args: make([]string, 0), + env: make([]string, 0), + hookFunctions: make(map[stackrun.Lifecycle]stackrun.HookFunction), } for _, o := range options { diff --git a/pkg/harness/exec/exec_options.go b/pkg/harness/exec/exec_options.go index 6c1e6dd7..0296e6b6 100644 --- a/pkg/harness/exec/exec_options.go +++ b/pkg/harness/exec/exec_options.go @@ -2,6 +2,8 @@ package exec import ( "io" + + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" ) func WithDir(workingDirectory string) Option { @@ -33,3 +35,9 @@ func WithID(id string) Option { e.id = id } } + +func WithHook(lifecycle stackrun.Lifecycle, fn stackrun.HookFunction) Option { + return func(e *executable) { + e.hookFunctions[lifecycle] = fn + } +} diff --git a/pkg/harness/exec/exec_types.go b/pkg/harness/exec/exec_types.go index 70d6b0bb..e4da286c 100644 --- a/pkg/harness/exec/exec_types.go +++ b/pkg/harness/exec/exec_types.go @@ -3,6 +3,8 @@ package exec import ( "context" "io" + + "github.com/pluralsh/deployment-operator/pkg/harness/stackrun" ) type Executable interface { @@ -37,6 +39,9 @@ type executable struct { // executable output. It does not stop output from being forwarded // to the os.Stdout. logSink io.WriteCloser + + // hookFunctions ... + hookFunctions map[stackrun.Lifecycle]stackrun.HookFunction } type Option func(*executable) diff --git a/pkg/harness/stackrun/types.go b/pkg/harness/stackrun/types.go index f98cbca8..372f1cab 100644 --- a/pkg/harness/stackrun/types.go +++ b/pkg/harness/stackrun/types.go @@ -14,6 +14,8 @@ type StackRun struct { Steps []*gqlclient.RunStepFragment Files []*gqlclient.StackFileFragment Environment []*gqlclient.StackEnvironmentFragment + Approval bool + ApprovedAt *string } func (in *StackRun) FromStackRunBaseFragment(fragment *gqlclient.StackRunBaseFragment) *StackRun { @@ -25,6 +27,8 @@ func (in *StackRun) FromStackRunBaseFragment(fragment *gqlclient.StackRunBaseFra Steps: fragment.Steps, Files: fragment.Files, Environment: fragment.Environment, + Approval: fragment.Approval != nil && *fragment.Approval, + ApprovedAt: fragment.ApprovedAt, } } @@ -39,3 +43,12 @@ func (in *StackRun) Env() []string { return result } + +type Lifecycle string + +const ( + LifecyclePreStart Lifecycle = "pre-start" + LifecyclePostStart Lifecycle = "post-start" +) + +type HookFunction func() error diff --git a/pkg/harness/tool/ansible/ansible.go b/pkg/harness/tool/ansible/ansible.go index 13408910..c5d89898 100644 --- a/pkg/harness/tool/ansible/ansible.go +++ b/pkg/harness/tool/ansible/ansible.go @@ -6,6 +6,11 @@ import ( v1 "github.com/pluralsh/deployment-operator/pkg/harness/tool/v1" ) +func (in *Ansible) Plan() (*console.StackStateAttributes, error) { + // TODO implement me + panic("implement me") +} + func (in *Ansible) State() (*console.StackStateAttributes, error) { // TODO implement me panic("implement me") diff --git a/pkg/harness/tool/terraform/terraform.go b/pkg/harness/tool/terraform/terraform.go index 7c26722a..ef997765 100644 --- a/pkg/harness/tool/terraform/terraform.go +++ b/pkg/harness/tool/terraform/terraform.go @@ -20,18 +20,12 @@ import ( // State implements v1.Tool interface. func (in *Terraform) State() (*console.StackStateAttributes, error) { - plan, err := in.plan() - if err != nil { - return nil, err - } - state, err := in.state() if err != nil { return nil, err } return &console.StackStateAttributes{ - Plan: &plan, State: algorithms.Map( state.Resources, func(r v4.Resource) *console.StackStateResourceAttributes { @@ -40,6 +34,17 @@ func (in *Terraform) State() (*console.StackStateAttributes, error) { }, nil } +func (in *Terraform) Plan() (*console.StackStateAttributes, error) { + plan, err := in.plan() + if err != nil { + return nil, err + } + + return &console.StackStateAttributes{ + Plan: &plan, + }, nil +} + // Output implements v1.Tool interface. func (in *Terraform) Output() ([]*console.StackOutputAttributes, error) { result := make([]*console.StackOutputAttributes, 0) @@ -103,7 +108,7 @@ func (in *Terraform) state() (*v4.State, error) { func (in *Terraform) plan() (string, error) { output, err := exec.NewExecutable( "terraform", - exec.WithArgs([]string{"show", "-json", in.planFileName}), + exec.WithArgs([]string{"show", in.planFileName}), exec.WithDir(in.dir), ).RunWithOutput(context.Background()) if err != nil { diff --git a/pkg/harness/tool/v1/types.go b/pkg/harness/tool/v1/types.go index 75032917..1253cff7 100644 --- a/pkg/harness/tool/v1/types.go +++ b/pkg/harness/tool/v1/types.go @@ -11,6 +11,8 @@ import ( // - gathering any available outputs from local files // - providing runtime modifiers to alter step command execution arguments, etc. type Tool interface { + // Plan ... + Plan() (*console.StackStateAttributes, error) // State tries to assemble state/plan information based on local files // created by specific tool after all steps are finished running. It then // transforms this information into gqlclient.StackStateAttributes.