Skip to content

Commit

Permalink
feat(harness): support approval flow and more (#194)
Browse files Browse the repository at this point in the history
* wait for approval, upload human-readable tf plan after plan stage  and remove global timeout for harness run

* omit arm64 for harness

* initialize the map

* set stack status on plan upload

* memoize approval state (won't change once approved)

---------

Co-authored-by: michaeljguarino <[email protected]>
  • Loading branch information
floreks and michaeljguarino authored May 25, 2024
1 parent fdd464a commit 45b3942
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 75 deletions.
3 changes: 2 additions & 1 deletion cmd/harness/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ func main() {
)
ctx := signals.NewCancelableContext(
signals.SetupSignalHandler(signals.ExitCodeTerminated),
signals.NewTimeoutSignal(args.Timeout()),
//signals.NewTimeoutSignal(args.Timeout()),

Check failure on line 26 in cmd/harness/main.go

View workflow job for this annotation

GitHub Actions / Lint

commentFormatting: put a space between `//` and comment text (gocritic)
signals.NewConsoleSignal(consoleClient, args.StackRunID()),
)

ctrl, err := controller.NewStackRunController(
controller.WithStackRun(args.StackRunID()),
controller.WithStackRunStepTimeout(args.Timeout()),
controller.WithConsoleClient(consoleClient),
controller.WithFetchClient(fetchClient),
controller.WithWorkingDir(args.WorkingDir()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/stacks/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewStackReconciler(consoleClient client.Client, k8sClient ctrlclient.Client
}

func (r *StackReconciler) GetPublisher() (string, websocket.Publisher) {
return "stack.event", &socketPublisher{
return "stack.run.event", &socketPublisher{
stackRunQueue: r.StackQueue,
stackRunCache: r.StackCache,
}
Expand Down
66 changes: 8 additions & 58 deletions pkg/harness/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controller
import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"sync"
Expand All @@ -13,9 +12,9 @@ import (
"k8s.io/klog/v2"

"github.com/pluralsh/deployment-operator/pkg/harness/environment"
internalerrors "github.com/pluralsh/deployment-operator/pkg/harness/errors"
"github.com/pluralsh/deployment-operator/pkg/harness/exec"
"github.com/pluralsh/deployment-operator/pkg/harness/sink"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
"github.com/pluralsh/deployment-operator/pkg/harness/tool"
"github.com/pluralsh/deployment-operator/pkg/log"
)
Expand Down Expand Up @@ -64,7 +63,7 @@ func (in *stackRunController) Start(ctx context.Context) (retErr error) {
// In case of any error finish the execution and return error.
case err := <-in.errChan:
retErr = err
// If execution finished successfully return.
// If execution finished successfully return without error.
case <-in.finishedChan:
retErr = nil
}
Expand All @@ -79,57 +78,6 @@ func (in *stackRunController) Start(ctx context.Context) (retErr error) {
return in.postStart(retErr)
}

func (in *stackRunController) preStart() {
if in.stackRun.Status != gqlclient.StackStatusPending && !environment.IsDev() {
klog.Fatalf("could not start stack run: invalid status: %s", in.stackRun.Status)
}

if err := in.markStackRun(gqlclient.StackStatusRunning); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
}

func (in *stackRunController) postStart(err error) error {
var status gqlclient.StackStatus

switch {
case err == nil:
status = gqlclient.StackStatusSuccessful
case errors.Is(err, internalerrors.ErrRemoteCancel):
status = gqlclient.StackStatusCancelled
default:
status = gqlclient.StackStatusFailed
}

if err := in.completeStackRun(status, err); err != nil {
klog.ErrorS(err, "could not complete stack run")
}

klog.V(log.LogLevelInfo).InfoS("stack run completed")
return err
}

func (in *stackRunController) postStepRun(id string, err error) {
var status gqlclient.StepStatus

switch {
case err == nil:
status = gqlclient.StepStatusSuccessful
default:
status = gqlclient.StepStatusFailed
}

if err := in.markStackRunStep(id, status); err != nil {
klog.ErrorS(err, "could not update stack run step status")
}
}

func (in *stackRunController) preStepRun(id string) {
if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
}

func (in *stackRunController) executables(ctx context.Context) []exec.Executable {
// Ensure that steps are sorted in the correct order
slices.SortFunc(in.stackRun.Steps, func(s1, s2 *gqlclient.RunStepFragment) int {
Expand Down Expand Up @@ -162,10 +110,10 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient.
)...,
)

argsModifier := in.tool.Modifier(step.Stage)
modifier := in.tool.Modifier(step.Stage)
args := step.Args
if argsModifier != nil {
args = argsModifier.Args(args)
if modifier != nil {
args = modifier.Args(args)
}

return exec.NewExecutable(
Expand All @@ -175,6 +123,8 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient.
exec.WithArgs(args),
exec.WithID(step.ID),
exec.WithLogSink(consoleWriter),
exec.WithHook(stackrun.LifecyclePreStart, in.preExecHook(step.Stage, step.ID)),
exec.WithHook(stackrun.LifecyclePostStart, in.postExecHook(step.Stage, step.ID)),
)
}

Expand Down Expand Up @@ -265,7 +215,7 @@ func NewStackRunController(options ...Option) (Controller, error) {
ctrl.executor = newExecutor(
errChan,
finishedChan,
WithPreRunFunc(ctrl.preStepRun),
//WithPreRunFunc(ctrl.preStepRun),

Check failure on line 218 in pkg/harness/controller/controller.go

View workflow job for this annotation

GitHub Actions / Lint

commentFormatting: put a space between `//` and comment text (gocritic)
WithPostRunFunc(ctrl.postStepRun),
)

Expand Down
129 changes: 129 additions & 0 deletions pkg/harness/controller/controller_hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package controller

import (
"context"
"errors"
"time"

gqlclient "github.com/pluralsh/console-client-go"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/pluralsh/deployment-operator/pkg/harness/environment"
internalerrors "github.com/pluralsh/deployment-operator/pkg/harness/errors"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
"github.com/pluralsh/deployment-operator/pkg/log"
)

var (
runApproved = false
)

func (in *stackRunController) preStart() {
if in.stackRun.Status != gqlclient.StackStatusPending && !environment.IsDev() {
klog.Fatalf("could not start stack run: invalid status: %s", in.stackRun.Status)
}

if err := in.markStackRun(gqlclient.StackStatusRunning); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
}

func (in *stackRunController) postStart(err error) error {
var status gqlclient.StackStatus

switch {
case err == nil:
status = gqlclient.StackStatusSuccessful
case errors.Is(err, internalerrors.ErrRemoteCancel):
status = gqlclient.StackStatusCancelled
default:
status = gqlclient.StackStatusFailed
}

if err := in.completeStackRun(status, err); err != nil {
klog.ErrorS(err, "could not complete stack run")
}

klog.V(log.LogLevelInfo).InfoS("stack run completed")
return err
}

func (in *stackRunController) postStepRun(id string, err error) {
var status gqlclient.StepStatus

switch {
case err == nil:
status = gqlclient.StepStatusSuccessful
default:
status = gqlclient.StepStatusFailed
}

if err := in.markStackRunStep(id, status); err != nil {
klog.ErrorS(err, "could not update stack run step status")
}
}

func (in *stackRunController) preStepRun(id string) {

Check failure on line 67 in pkg/harness/controller/controller_hooks.go

View workflow job for this annotation

GitHub Actions / Lint

func `(*stackRunController).preStepRun` is unused (unused)
if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
}

func (in *stackRunController) postExecHook(stage gqlclient.StepStage, id string) stackrun.HookFunction {
return func() error {
if stage != gqlclient.StepStagePlan {
return nil
}

return in.uploadPlan()
}
}

func (in *stackRunController) preExecHook(stage gqlclient.StepStage, id string) stackrun.HookFunction {
return func() error {
if stage == gqlclient.StepStageApply {
if err := in.approvalCheck(); err != nil {
return err
}
}

if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
return nil
}
}

func (in *stackRunController) approvalCheck() error {
if !in.stackRun.Approval || runApproved {
return nil
}

return wait.PollUntilContextCancel(context.Background(), 5*time.Second, true, func(_ context.Context) (done bool, err error) {
if runApproved {
return true, nil
}

stack, err := in.consoleClient.GetStackRun(in.stackRunID)
if err != nil {
klog.ErrorS(err, "could not check stack run approval")
return false, nil
}

runApproved = stack.ApprovedAt != nil
return runApproved, nil
})
}

func (in *stackRunController) uploadPlan() error {
state, err := in.tool.Plan()
if err != nil {
klog.ErrorS(err, "could not prepare plan")
}

return in.consoleClient.UpdateStackRun(in.stackRunID, gqlclient.StackRunAttributes{
State: state,
Status: gqlclient.StackStatusRunning,
})
}
8 changes: 8 additions & 0 deletions pkg/harness/controller/controller_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"time"

"github.com/pluralsh/deployment-operator/internal/helpers"
console "github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/harness/sink"
Expand Down Expand Up @@ -35,3 +37,9 @@ func WithSinkOptions(options ...sink.Option) Option {
s.sinkOptions = options
}
}

func WithStackRunStepTimeout(timeout time.Duration) Option {
return func(s *stackRunController) {
s.stackRunStepTimeout = timeout
}
}
4 changes: 4 additions & 0 deletions pkg/harness/controller/controller_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"sync"
"time"

"github.com/pluralsh/deployment-operator/internal/helpers"
console "github.com/pluralsh/deployment-operator/pkg/client"
Expand Down Expand Up @@ -30,6 +31,9 @@ type stackRunController struct {
// stackRunID
stackRunID string

// stackRunStepTimeout
stackRunStepTimeout time.Duration

// stackRun
stackRun *stackrun.StackRun

Expand Down
18 changes: 14 additions & 4 deletions pkg/harness/controller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/klog/v2"

"github.com/pluralsh/deployment-operator/pkg/harness/exec"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
"github.com/pluralsh/deployment-operator/pkg/log"
)

Expand Down Expand Up @@ -144,12 +145,21 @@ func (in *executor) dequeue(executable exec.Executable) (empty bool) {
return len(in.startQueue) == 0
}

func (in *executor) runLifecycleFunction(lifecycle stackrun.Lifecycle) error {

Check failure on line 148 in pkg/harness/controller/executor.go

View workflow job for this annotation

GitHub Actions / Lint

func `(*executor).runLifecycleFunction` is unused (unused)
if fn, exists := in.hookFunctions[lifecycle]; exists {
return fn()
}

return nil
}

func newExecutor(errChan chan error, finishedChan chan struct{}, options ...ExecutorOption) *executor {
result := &executor{
errChan: errChan,
finishedChan: finishedChan,
strategy: ExecutionStrategyOrdered,
ch: make(chan exec.Executable),
errChan: errChan,
finishedChan: finishedChan,
strategy: ExecutionStrategyOrdered,
ch: make(chan exec.Executable),
hookFunctions: make(map[stackrun.Lifecycle]stackrun.HookFunction),
}

for _, option := range options {
Expand Down
10 changes: 10 additions & 0 deletions pkg/harness/controller/executor_options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package controller

import (
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
)

func WithExecutionStrategy(strategy ExecutionStrategy) ExecutorOption {
return func(e *executor) {
e.strategy = strategy
Expand All @@ -17,3 +21,9 @@ func WithPreRunFunc(fn func(string)) ExecutorOption {
e.preRunFunc = fn
}
}

func WithHook(lifecycle stackrun.Lifecycle, fn stackrun.HookFunction) ExecutorOption {
return func(e *executor) {
e.hookFunctions[lifecycle] = fn
}
}
4 changes: 4 additions & 0 deletions pkg/harness/controller/executor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"github.com/pluralsh/deployment-operator/pkg/harness/exec"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
)

type executor struct {
Expand Down Expand Up @@ -41,6 +42,9 @@ type executor struct {
// It is used only by the ExecutionStrategyOrdered to ensure ordered
// run of the executables.
ch chan exec.Executable

// hookFunctions ...
hookFunctions map[stackrun.Lifecycle]stackrun.HookFunction
}

type ExecutorOption func(*executor)
Expand Down
Loading

0 comments on commit 45b3942

Please sign in to comment.