Skip to content

Commit

Permalink
feat: Properly handle all controller errors (#196)
Browse files Browse the repository at this point in the history
* Properly handle all controller errors

It looks like there might not be error handling for issues that happen pre execution, this will solve for that a bit better in lieu of a more robust way of reporting run completion.

* add workdir support

* switch to chainguard wolfi base image for the package manager

* simplify ordered execution logic

* refactor code and update apply args modifier

* cleanup, add workdir arg handling to job controller, ensure postStart hook on ctrl start early return

* fix lint

* fix import cycle

* fix lint

* set step status to failed if stack run fails or is cancelled

* mark stack run as pending approval when approval is required and plan has been uploaded

* properly handle stack run statuses before and after approval

* properly handle stack run statuses before and after approval

* simplify ordered execution logic

* fix lint

* use custom exec work dir from stack and restore controller cleanup via finish fn

* create additional files in the stack workdir

* revert job.go change

* run post start on controller finish

---------

Co-authored-by: Sebastian Florek <[email protected]>
  • Loading branch information
michaeljguarino and floreks authored May 27, 2024
1 parent 45b3942 commit f5daa01
Show file tree
Hide file tree
Showing 26 changed files with 216 additions and 167 deletions.
4 changes: 1 addition & 3 deletions cmd/harness/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func main() {
)
ctx := signals.NewCancelableContext(
signals.SetupSignalHandler(signals.ExitCodeTerminated),
//signals.NewTimeoutSignal(args.Timeout()),
signals.NewConsoleSignal(consoleClient, args.StackRunID()),
)

Expand All @@ -43,13 +42,12 @@ func main() {
}

if err = ctrl.Start(ctx); err != nil {
_ = ctrl.Finish(err)
handleFatalError(err)
}
}

func handleFatalError(err error) {
// TODO: initiate a graceful shutdown procedure

switch {
case errors.Is(err, internalerrors.ErrTimeout):
klog.ErrorS(err, "timed out waiting for stack run to complete", "timeout", args.Timeout())
Expand Down
8 changes: 5 additions & 3 deletions dockerfiles/harness/base.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ FROM busybox:1.35.0-uclibc as environment
RUN mkdir /plural
RUN mkdir /tmp/plural

FROM gcr.io/distroless/base-debian12:nonroot as final
FROM cgr.dev/chainguard/wolfi-base:latest as final

# Switch to the nonroot user
USER nonroot:nonroot
RUN apk update --no-cache && apk add git

# # Switch to the nonroot user
USER nonroot

# Set up the environment
# 1. copy plural and tmp directories with proper permissions for the nonroot user
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/open-policy-agent/gatekeeper/v3 v3.15.1
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pkg/errors v0.9.1
github.com/pluralsh/console-client-go v0.5.8
github.com/pluralsh/console-client-go v0.5.9
github.com/pluralsh/controller-reconcile-helper v0.0.4
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
github.com/pluralsh/polly v0.1.10
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pluralsh/console-client-go v0.5.8 h1:Qm7vS+gCbmWqy5i4saLPc5/SUZaW6RCzxWF+uxyPA+Y=
github.com/pluralsh/console-client-go v0.5.8/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/console-client-go v0.5.9 h1:r5YMD4dU2zWiDApWtqu45l/02X4RnsNeVEFzuuyehEA=
github.com/pluralsh/console-client-go v0.5.9/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E=
github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s=
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/stackrunjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *StackRunJobReconciler) getStepStatusUpdate(stackStatus console.StackSta
}

if stackStatus == console.StackStatusFailed || stackStatus == console.StackStatusCancelled {
return lo.ToPtr(console.StepStatusSuccessful)
return lo.ToPtr(console.StepStatusFailed)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/pluralsh/deployment-operator/api/v1alpha1"
"github.com/pluralsh/deployment-operator/internal/helpers"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
v1 "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1"
)

var lock = &sync.Mutex{}
Expand Down Expand Up @@ -69,7 +69,7 @@ type Client interface {
UpsertConstraints(constraints []*console.PolicyConstraintAttributes) (*console.UpsertPolicyConstraints, error)
GetNamespace(id string) (*console.ManagedNamespaceFragment, error)
ListNamespaces(after *string, first *int64) (*console.ListClusterNamespaces_ClusterManagedNamespaces, error)
GetStackRunBase(id string) (*stackrun.StackRun, error)
GetStackRunBase(id string) (*v1.StackRun, error)
GetStackRun(id string) (*console.StackRunFragment, error)
AddStackRunLogs(id, logs string) error
CompleteStackRun(id string, attributes console.StackRunAttributes) error
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (

internalerrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/pkg/harness/errors"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
v1 "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1"
"github.com/pluralsh/deployment-operator/pkg/log"
)

func (c *client) GetStackRunBase(id string) (result *stackrun.StackRun, err error) {
func (c *client) GetStackRunBase(id string) (result *v1.StackRun, err error) {
stackRun, err := c.consoleClient.GetStackRunBase(c.ctx, id)
if err != nil && !internalerrors.IsNotFound(err) {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/stacks/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (r *StackReconciler) GenerateRunJob(run *console.StackRunFragment, name str
jobSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever

jobSpec.BackoffLimit = lo.ToPtr(int32(0))
jobSpec.TTLSecondsAfterFinished = lo.ToPtr(int32(60 * 60))

jobSpec.Template.Spec.Containers = r.ensureDefaultContainer(jobSpec.Template.Spec.Containers, run)

Expand Down
73 changes: 44 additions & 29 deletions pkg/harness/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"cmp"
"context"
"fmt"
"path"
"slices"
"sync"

Expand All @@ -14,7 +15,7 @@ import (
"github.com/pluralsh/deployment-operator/pkg/harness/environment"
"github.com/pluralsh/deployment-operator/pkg/harness/exec"
"github.com/pluralsh/deployment-operator/pkg/harness/sink"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
v1 "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1"
"github.com/pluralsh/deployment-operator/pkg/harness/tool"
"github.com/pluralsh/deployment-operator/pkg/log"
)
Expand Down Expand Up @@ -78,6 +79,14 @@ func (in *stackRunController) Start(ctx context.Context) (retErr error) {
return in.postStart(retErr)
}

func (in *stackRunController) Finish(stackRunErr error) error {
if stackRunErr == nil {
return nil
}

return in.postStart(stackRunErr)
}

func (in *stackRunController) executables(ctx context.Context) []exec.Executable {
// Ensure that steps are sorted in the correct order
slices.SortFunc(in.stackRun.Steps, func(s1, s2 *gqlclient.RunStepFragment) int {
Expand Down Expand Up @@ -118,42 +127,36 @@ func (in *stackRunController) toExecutable(ctx context.Context, step *gqlclient.

return exec.NewExecutable(
step.Cmd,
exec.WithDir(in.dir),
exec.WithDir(in.execWorkDir()),
exec.WithEnv(in.stackRun.Env()),
exec.WithArgs(args),
exec.WithID(step.ID),
exec.WithLogSink(consoleWriter),
exec.WithHook(stackrun.LifecyclePreStart, in.preExecHook(step.Stage, step.ID)),
exec.WithHook(stackrun.LifecyclePostStart, in.postExecHook(step.Stage, step.ID)),
exec.WithHook(v1.LifecyclePreStart, in.preExecHook(step.Stage, step.ID)),
exec.WithHook(v1.LifecyclePostStart, in.postExecHook(step.Stage)),
)
}

func (in *stackRunController) markStackRun(status gqlclient.StackStatus) error {
return in.consoleClient.UpdateStackRun(in.stackRunID, gqlclient.StackRunAttributes{
Status: status,
})
}

func (in *stackRunController) markStackRunStep(id string, status gqlclient.StepStatus) error {
return in.consoleClient.UpdateStackRunStep(id, gqlclient.RunStepAttributes{
Status: status,
})
}

func (in *stackRunController) completeStackRun(status gqlclient.StackStatus, stackRunErr error) error {
state, err := in.tool.State()
if err != nil {
klog.ErrorS(err, "could not prepare state attributes")
}
var state *gqlclient.StackStateAttributes
var output []*gqlclient.StackOutputAttributes
var err error

if in.tool != nil {
state, err = in.tool.State()
if err != nil {
klog.ErrorS(err, "could not prepare state attributes")
}

klog.V(log.LogLevelTrace).InfoS("generated console state", "state", state)
klog.V(log.LogLevelTrace).InfoS("generated console state", "state", state)

output, err := in.tool.Output()
if err != nil {
klog.ErrorS(err, "could not prepare output attributes")
}
output, err = in.tool.Output()
if err != nil {
klog.ErrorS(err, "could not prepare output attributes")
}

klog.V(log.LogLevelTrace).InfoS("generated console output", "output", output)
klog.V(log.LogLevelTrace).InfoS("generated console output", "output", output)
}

serviceErrorAttributes := make([]*gqlclient.ServiceErrorAttributes, 0)
if stackRunErr != nil {
Expand All @@ -170,16 +173,29 @@ func (in *stackRunController) completeStackRun(status gqlclient.StackStatus, sta
})
}

func (in *stackRunController) execWorkDir() string {
if in.stackRun.ExecWorkDir != nil && len(*in.stackRun.ExecWorkDir) > 0 {
return path.Join(in.dir, *in.stackRun.ExecWorkDir)
}

return in.dir
}

func (in *stackRunController) prepare() error {
env := environment.New(
environment.WithStackRun(in.stackRun),
environment.WithWorkingDir(in.dir),
environment.WithFilesDir(in.execWorkDir()),
environment.WithFetchClient(in.fetchClient),
)

in.tool = tool.New(in.stackRun.Type, in.dir)
if err := env.Setup(); err != nil {
return err
}

in.tool = tool.New(in.stackRun.Type, in.execWorkDir())

return env.Setup()
return nil
}

func (in *stackRunController) init() (Controller, error) {
Expand Down Expand Up @@ -215,7 +231,6 @@ func NewStackRunController(options ...Option) (Controller, error) {
ctrl.executor = newExecutor(
errChan,
finishedChan,
//WithPreRunFunc(ctrl.preStepRun),
WithPostRunFunc(ctrl.postStepRun),
)

Expand Down
49 changes: 29 additions & 20 deletions pkg/harness/controller/controller_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,26 @@ import (
"github.com/pluralsh/deployment-operator/pkg/harness/environment"
internalerrors "github.com/pluralsh/deployment-operator/pkg/harness/errors"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
v1 "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1"
"github.com/pluralsh/deployment-operator/pkg/log"
)

var (
runApproved = false
)

// preStart function is executed before stack run steps.
func (in *stackRunController) preStart() {
if in.stackRun.Status != gqlclient.StackStatusPending && !environment.IsDev() {
klog.Fatalf("could not start stack run: invalid status: %s", in.stackRun.Status)
}

if err := in.markStackRun(gqlclient.StackStatusRunning); err != nil {
if err := stackrun.StartStackRun(in.consoleClient, in.stackRunID); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
}

// postStart function is executed after all stack run steps.
func (in *stackRunController) postStart(err error) error {
var status gqlclient.StackStatus

Expand All @@ -49,6 +52,9 @@ func (in *stackRunController) postStart(err error) error {
return err
}

// postStepRun is a callback function started by the executor after executable finishes.
// It provides the information about run step that was executed and if it exited with error
// or not.
func (in *stackRunController) postStepRun(id string, err error) {
var status gqlclient.StepStatus

Expand All @@ -59,18 +65,14 @@ func (in *stackRunController) postStepRun(id string, err error) {
status = gqlclient.StepStatusFailed
}

if err := in.markStackRunStep(id, status); err != nil {
if err := stackrun.MarkStackRunStep(in.consoleClient, id, status); err != nil {
klog.ErrorS(err, "could not update stack run step status")
}
}

func (in *stackRunController) preStepRun(id string) {
if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil {
klog.ErrorS(err, "could not update stack run status")
}
}

func (in *stackRunController) postExecHook(stage gqlclient.StepStage, id string) stackrun.HookFunction {
// postExecHook is a callback function started by the exec.Executable after it finishes.
// It differs from the
func (in *stackRunController) postExecHook(stage gqlclient.StepStage) v1.HookFunction {
return func() error {
if stage != gqlclient.StepStagePlan {
return nil
Expand All @@ -80,27 +82,31 @@ func (in *stackRunController) postExecHook(stage gqlclient.StepStage, id string)
}
}

func (in *stackRunController) preExecHook(stage gqlclient.StepStage, id string) stackrun.HookFunction {
func (in *stackRunController) preExecHook(stage gqlclient.StepStage, id string) v1.HookFunction {
return func() error {
if stage == gqlclient.StepStageApply {
if err := in.approvalCheck(); err != nil {
return err
}
if stage == gqlclient.StepStageApply && in.requiresApproval() {
in.waitForApproval()
}

if err := in.markStackRunStep(id, gqlclient.StepStatusRunning); err != nil {
if err := stackrun.StartStackRunStep(in.consoleClient, id); err != nil {
klog.ErrorS(err, "could not update stack run status")
}

return nil
}
}

func (in *stackRunController) approvalCheck() error {
if !in.stackRun.Approval || runApproved {
return nil
}
func (in *stackRunController) requiresApproval() bool {
return in.stackRun.Approval && !runApproved
}

return wait.PollUntilContextCancel(context.Background(), 5*time.Second, true, func(_ context.Context) (done bool, err error) {
func (in *stackRunController) waitForApproval() {
// Retry here to make sure that the pending approval status will be set before we start waiting.
stackrun.MarkStackRunWithRetry(in.consoleClient, in.stackRunID, gqlclient.StackStatusPendingApproval, 5*time.Second)

klog.V(log.LogLevelInfo).InfoS("waiting for approval to proceed")
// Condition function never returns error. We can ignore it.
_ = wait.PollUntilContextCancel(context.Background(), 5*time.Second, true, func(_ context.Context) (done bool, err error) {
if runApproved {
return true, nil
}
Expand All @@ -114,6 +120,9 @@ func (in *stackRunController) approvalCheck() error {
runApproved = stack.ApprovedAt != nil
return runApproved, nil
})

// Retry here to make sure that we resume the stack run status to running after it has been approved.
stackrun.MarkStackRunWithRetry(in.consoleClient, in.stackRunID, gqlclient.StackStatusRunning, 5*time.Second)
}

func (in *stackRunController) uploadPlan() error {
Expand Down
9 changes: 5 additions & 4 deletions pkg/harness/controller/controller_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"github.com/pluralsh/deployment-operator/internal/helpers"
console "github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/harness/sink"
"github.com/pluralsh/deployment-operator/pkg/harness/stackrun"
v1 "github.com/pluralsh/deployment-operator/pkg/harness/tool/v1"
stackrunv1 "github.com/pluralsh/deployment-operator/pkg/harness/stackrun/v1"
toolv1 "github.com/pluralsh/deployment-operator/pkg/harness/tool/v1"
)

type Controller interface {
Start(ctx context.Context) error
Finish(stackRunErr error) error
}

type stackRunController struct {
Expand All @@ -35,7 +36,7 @@ type stackRunController struct {
stackRunStepTimeout time.Duration

// stackRun
stackRun *stackrun.StackRun
stackRun *stackrunv1.StackRun

// consoleClient
consoleClient console.Client
Expand All @@ -55,7 +56,7 @@ type stackRunController struct {
// List of supported tools is based on the gqlclient.StackType.
// It is mainly responsible for:
// - gathering state
tool v1.Tool
tool toolv1.Tool

// wg
wg sync.WaitGroup
Expand Down
Loading

0 comments on commit f5daa01

Please sign in to comment.