Skip to content

Commit

Permalink
Gracefully handle non-throttling errors (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
tlake authored Nov 19, 2019
1 parent c81b60d commit 7439ce7
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions ecstask/ecs_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func Run(cmd *cobra.Command, args []string, flags map[string]interface{}) {
fmt.Println("\n`--apply` flag present")
fmt.Printf("deregistering %d task definitions...\n", len(allTaskDefinitionArns))

deregisterTaskDefinitions(svc, allTaskDefinitionArns, flags["parallel"].(int), flags["debug"].(bool))
deregisterTaskDefinitions(svc, allTaskDefinitionArns, flags["parallel"].(int), flags["verbose"].(bool), flags["debug"].(bool))

fmt.Println("\nfinished")
} else {
Expand Down Expand Up @@ -412,7 +412,7 @@ type Result struct {
Err error
}

func deregisterTaskDefinitions(svc *ecs.ECS, taskDefinitionArns []string, parallel int, debug bool) {
func deregisterTaskDefinitions(svc *ecs.ECS, taskDefinitionArns []string, parallel int, verbose, debug bool) {
jobsChan := make(chan Job, parallel)
resultsChan := make(chan Result, parallel)

Expand All @@ -423,7 +423,7 @@ func deregisterTaskDefinitions(svc *ecs.ECS, taskDefinitionArns []string, parall
}

wg.Add(1)
go dispatcher(&wg, taskDefinitionArns, parallel, jobsChan, resultsChan, debug)
go dispatcher(&wg, taskDefinitionArns, parallel, jobsChan, resultsChan, verbose, debug)

wg.Wait()
}
Expand All @@ -443,13 +443,15 @@ func worker(svc *ecs.ECS, wg *sync.WaitGroup, jobsChan <-chan Job, resultsChan c
wg.Done()
}

func dispatcher(wg *sync.WaitGroup, arns []string, parallel int, jobsChan chan Job, resultsChan chan Result, debug bool) {
func dispatcher(wg *sync.WaitGroup, arns []string, parallel int, jobsChan chan Job, resultsChan chan Result, verbose, debug bool) {
jobs := stack.New()
for _, arn := range arns {
jobs.Push(Job{arn})
}

var completedJobs int
var failedJobs []Result
var numCompletedJobs int
numJobsToComplete := len(arns)

preload := 1
if parallel > 1 {
Expand All @@ -468,39 +470,48 @@ func dispatcher(wg *sync.WaitGroup, arns []string, parallel int, jobsChan chan J

for result := range resultsChan {
if result.Err != nil {
if !isThrottlingError(result.Err) {
fmt.Println("\nError deregistering task definition,", result.Err)
close(jobsChan)
close(resultsChan)
wg.Done()
return
}
if isThrottlingError(result.Err) {
t := b.Duration()
if debug {
fmt.Printf("\nbackoff triggered for %s,", result.Arn)
fmt.Printf("\nwaiting for %v\n", t)
}

time.Sleep(t)
jobs.Push(Job{Arn: result.Arn})

t := b.Duration()
if debug {
fmt.Printf("\nbackoff triggered for %s,", result.Arn)
fmt.Printf("\nwaiting for %v\n", t)
} else {
failedJobs = append(failedJobs, result)
numJobsToComplete--
}

time.Sleep(t)
jobs.Push(Job{Arn: result.Arn})
} else {
b.Reset()
completedJobs++
fmt.Printf("\r%d deregistered task definitions", completedJobs)
numCompletedJobs++
}

fmt.Printf("\r%d deregistered task definitions, %d errored", numCompletedJobs, len(failedJobs))

if jobs.Len() > 0 {
jobsChan <- jobs.Pop().(Job)
}

if completedJobs == len(arns) {
if numCompletedJobs >= numJobsToComplete {
close(jobsChan)
close(resultsChan)
}
}

wg.Done()

if len(failedJobs) > 0 {
fmt.Printf("\n%d task definitions errored.\n", len(failedJobs))
if verbose {
for _, result := range failedJobs {
fmt.Printf("%s: %v\n", result.Arn, result.Err)
}
}
}
}

func isThrottlingError(err error) bool {
Expand Down

0 comments on commit 7439ce7

Please sign in to comment.