Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

baur run: exit gracefully on errors, finish started task-runs, upload and db recording operations #335

Closed
wants to merge 7 commits into from
20 changes: 7 additions & 13 deletions internal/command/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"os"
"path/filepath"

"github.com/fatih/color"

"github.com/simplesurance/baur/v2/internal/command/term"
"github.com/simplesurance/baur/v2/internal/format"
"github.com/simplesurance/baur/v2/internal/log"
Expand Down Expand Up @@ -217,25 +215,21 @@ func mustWriteRow(fmt format.Formatter, row ...interface{}) {
exitOnErr(err)
}

var errorPrefix = color.New(color.FgRed).Sprint("ERROR:")

func exitOnErrf(err error, format string, v ...interface{}) {
exitOnErr(err, fmt.Sprintf(format, v...))
if err == nil {
return
}

stderr.ErrPrintf(err, format, v...)
exitFunc(1)
}

func exitOnErr(err error, msg ...interface{}) {
if err == nil {
return
}

if len(msg) == 0 {
stderr.Println(errorPrefix, err)
exitFunc(1)
}

wholeMsg := fmt.Sprint(msg...)
stderr.Printf("%s %s: %s\n", errorPrefix, wholeMsg, err)

stderr.ErrPrintln(err, msg...)
exitFunc(1)
}

Expand Down
147 changes: 113 additions & 34 deletions internal/command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package command

import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"time"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -64,6 +66,12 @@ The following Environment Variables are supported:
term.Highlight("DOCKER_CERT_PATH"),
term.Highlight("DOCKER_TLS_VERIFY"))

var (
statusStrSuccess = term.GreenHighlight("successful")
statusStrSkipped = term.YellowHighlight("skipped")
statusStrFailed = term.RedHighlight("failed")
)

func init() {
rootCmd.AddCommand(&newRunCmd().Command)
}
Expand All @@ -72,12 +80,12 @@ type runCmd struct {
cobra.Command

// Cmdline parameters
skipUpload bool
force bool
inputStr []string
lookupInputStr string
taskRunners uint
showOutput bool
skipUpload bool
force bool
inputStr []string
lookupInputStr string
taskRunnerGoRoutines uint
showOutput bool

// other fields
storage storage.Storer
Expand All @@ -88,6 +96,10 @@ type runCmd struct {

uploadRoutinePool *routines.Pool
taskRunnerRoutinePool *routines.Pool
taskRunner *baur.TaskRunner

skipAllScheduledTaskRunsOnce sync.Once
errorHappened bool
}

func newRunCmd() *runCmd {
Expand All @@ -110,7 +122,7 @@ func newRunCmd() *runCmd {
"include a string as input, can be specified multiple times")
cmd.Flags().StringVar(&cmd.lookupInputStr, "lookup-input-str", "",
"if a run can not be found, try to find a run with this value as input-string")
cmd.Flags().UintVarP(&cmd.taskRunners, "parallel-runs", "p", 1,
cmd.Flags().UintVarP(&cmd.taskRunnerGoRoutines, "parallel-runs", "p", 1,
"specifies the max. number of tasks to run in parallel")
cmd.Flags().BoolVarP(&cmd.showOutput, "show-task-output", "o", false,
"show the output of tasks, if disabled the output is only shown "+
Expand All @@ -123,7 +135,7 @@ func newRunCmd() *runCmd {
func (c *runCmd) run(cmd *cobra.Command, args []string) {
var err error

if c.taskRunners == 0 {
if c.taskRunnerGoRoutines == 0 {
stderr.Printf("--parallel-runs must be greater than 0\n")
exitFunc(1)
}
Expand All @@ -137,7 +149,8 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) {
defer c.storage.Close()

c.uploadRoutinePool = routines.NewPool(1) // run 1 upload in parallel with builds
c.taskRunnerRoutinePool = routines.NewPool(c.taskRunners)
c.taskRunnerRoutinePool = routines.NewPool(c.taskRunnerGoRoutines)
c.taskRunner = baur.NewTaskRunner()

c.dockerClient, err = docker.NewClient(log.StdLogger.Debugf)
exitOnErr(err)
Expand Down Expand Up @@ -184,77 +197,120 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) {
ptCopy := pt
c.taskRunnerRoutinePool.Queue(func() {
task := ptCopy.task
runResult := c.runTask(task)
runResult, err := c.runTask(task)
if err != nil {
// error is printed in runTask()
c.skipAllScheduledTaskRuns()
return
}

outputs, err := baur.OutputsFromTask(c.dockerClient, task)
exitOnErrf(err, task.ID())
if err != nil {
stderr.ErrPrintln(err, task.ID())
c.skipAllScheduledTaskRuns()
return
}

if !outputsExist(task, outputs) {
exitFunc(1)
// error is printed in runTask()
c.skipAllScheduledTaskRuns()
return
}

if c.skipUpload {
FranciscoKurpiel marked this conversation as resolved.
Show resolved Hide resolved
return
}

c.uploadRoutinePool.Queue(func() {
c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult)
err := c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult)
if err != nil {
// error is printed in runTask()
c.skipAllScheduledTaskRuns()
}
})
})
}

c.taskRunnerRoutinePool.Wait()

stdout.Println("all tasks executed, waiting for uploads to finish...")
stdout.Println("task execution finished, waiting for uploads to finish...")
c.uploadRoutinePool.Wait()
stdout.PrintSep()
stdout.Printf("finished in: %s\n",
term.FormatDuration(
time.Since(startTime),
),
)

if c.errorHappened {
exitFunc(1)
}
}

func (c *runCmd) runTask(task *baur.Task) *baur.RunResult {
result, err := baur.NewTaskRunner().Run(task)
exitOnErrf(err, "%s", task.ID())
func (c *runCmd) skipAllScheduledTaskRuns() {
c.skipAllScheduledTaskRunsOnce.Do(func() {
c.taskRunner.SkipRuns(true)

if result.Result.ExitCode != 0 {
statusStr := term.RedHighlight("failed")
c.errorHappened = true

stderr.Printf("%s, %s execution of queued task runs\n",
term.RedHighlight("terminating"),
term.YellowHighlight("skipping"),
)
})
}

func (c *runCmd) runTask(task *baur.Task) (*baur.RunResult, error) {
result, err := c.taskRunner.Run(task)
if err != nil {
if errors.Is(err, baur.ErrTaskRunSkipped) {
stderr.Printf("%s: execution %s\n",
term.Highlight(task),
statusStrSkipped,
)
return nil, err
}

stderr.Printf("%s: execution %s, error: %s\n",
term.Highlight(task),
statusStrFailed,
err,
)
return nil, err
}

if result.Result.ExitCode != 0 {
if c.showOutput {
stderr.Printf("%s: execution %s (%s), command exited with code %d\n",
task,
statusStr,
term.Highlight(task),
statusStrFailed,
term.FormatDuration(
result.StopTime.Sub(result.StartTime),
),
result.ExitCode,
)
} else {
stderr.Printf("%s: execution %s (%s), command exited with code %d, output:\n%s\n",
task,
statusStr,
term.Highlight(task),
statusStrFailed,
term.FormatDuration(
result.StopTime.Sub(result.StartTime),
),
result.ExitCode,
result.StrOutput())
}

exitFunc(1)
return nil, fmt.Errorf("execution failed with exit code %d", result.ExitCode)
}

statusStr := term.GreenHighlight("successful")

stdout.TaskPrintf(task, "execution %s (%s)\n",
statusStr,
statusStrSuccess,
term.FormatDuration(
result.StopTime.Sub(result.StartTime),
),
)

return result
return result, nil
}

type pendingTask struct {
Expand All @@ -268,7 +324,7 @@ func (c *runCmd) uploadAndRecord(
inputs *baur.Inputs,
outputs []baur.Output,
runResult *baur.RunResult,
) {
) error {
var uploadResults []*baur.UploadResult

for _, output := range outputs {
Expand All @@ -280,7 +336,11 @@ func (c *runCmd) uploadAndRecord(
},
func(o baur.Output, result *baur.UploadResult) {
size, err := o.SizeBytes()
exitOnErrf(err, "%s: %s:", task.ID(), output)
if err != nil {
stderr.ErrPrintf(err, "%s: %s", task, output)
c.skipAllScheduledTaskRuns()
return
}

bps := uint64(math.Round(float64(size) / result.Stop.Sub(result.Start).Seconds()))

Expand All @@ -292,14 +352,30 @@ func (c *runCmd) uploadAndRecord(
uploadResults = append(uploadResults, result)
},
)

exitOnErrf(err, "%s: %s", task.ID(), output)
if err != nil {
stderr.Printf("%s: %s: upload %s, %s\n",
term.Highlight(task),
output,
statusStrFailed,
err,
)
return err
}
}

id, err := baur.StoreRun(ctx, c.storage, c.vcsState, task, inputs, runResult, uploadResults)
exitOnErrf(err, "%s", task.ID())
if err != nil {
stderr.Printf("%s: recording build result in database %s, %s\n",
term.Highlight(task),
statusStrFailed,
err,
)
return err
}

stdout.TaskPrintf(task, "run stored in database with ID %s\n", term.Highlight(id))

return nil
}

func outputsExist(task *baur.Task, outputs []baur.Output) bool {
Expand All @@ -312,7 +388,10 @@ func outputsExist(task *baur.Task, outputs []baur.Output) bool {

for _, output := range outputs {
exists, err := output.Exists()
exitOnErrf(err, "%s:", task.ID())
if err != nil {
stderr.ErrPrintf(err, task.ID())
return false
}

if exists {
size, err := output.SizeBytes()
Expand Down
22 changes: 22 additions & 0 deletions internal/command/term/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"io"
"sync"

"github.com/fatih/color"

"github.com/simplesurance/baur/v2/pkg/baur"
)

const separator = "------------------------------------------------------------------------------"

var errorPrefix = color.New(color.FgRed).Sprint("ERROR:")

// Stream is a concurrency-safe output for term.messages.
type Stream struct {
stream io.Writer
Expand Down Expand Up @@ -41,6 +45,24 @@ func (s *Stream) TaskPrintf(task *baur.Task, format string, a ...interface{}) {
s.Printf(prefix+format, a...)
}

// ErrPrintln prints an error with an optional message.
// The method prints the error in the format: errorPrefix msg: err
func (s *Stream) ErrPrintln(err error, msg ...interface{}) {
if len(msg) == 0 {
s.Println(errorPrefix, err)
return
}

wholeMsg := fmt.Sprint(msg...)
s.Printf("%s %s: %s\n", errorPrefix, wholeMsg, err)
}

// ErrPrintf prints an error with an optional printf-style message.
// The method prints the error in the format: errorPrefix msg: err
func (s *Stream) ErrPrintf(err error, format string, a ...interface{}) {
s.ErrPrintln(err, fmt.Sprintf(format, a...))
}

// PrintSep prints a separator line
func (s *Stream) PrintSep() {
fmt.Fprintln(s.stream, separator)
Expand Down
Loading