Skip to content

Commit

Permalink
update waiting for k8s jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
KrKOo committed Jun 12, 2024
1 parent d0399c9 commit c0afab1
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions worker/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"fmt"
"io"
"text/template"
"time"

"github.com/ohsu-comp-bio/funnel/tes"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/rest"
)

Expand Down Expand Up @@ -84,14 +85,7 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
return fmt.Errorf("creating job: %v", err)
}

// Wait until the job finishes
watcher, err := client.Watch(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)})
if err != nil {
return err
}

defer watcher.Stop()
waitForJobFinish(ctx, watcher)
waitForJobFinish(ctx, client, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)})

pods, err := clientset.CoreV1().Pods(kcmd.Namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s-%d", taskId, kcmd.JobId)})
if err != nil {
Expand Down Expand Up @@ -145,23 +139,30 @@ func (kcmd KubernetesCommand) Stop() error {
}

// Waits until the job finishes
func waitForJobFinish(ctx context.Context, watcher watch.Interface) {
func waitForJobFinish(ctx context.Context, client batchv1.JobInterface, listOptions metav1.ListOptions) {
ticker := time.NewTicker(10 * time.Second)

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Error || event.Type == watch.Deleted {
case <-ctx.Done():
return
case <-ticker.C:
jobs, err := client.List(ctx, listOptions)

if err != nil {
return
} else if event.Type == watch.Bookmark {
continue
}

job := event.Object.(*v1.Job)
if len(jobs.Items) == 0 {
// Should not happen
return
}

// There should be always only one job
job := jobs.Items[0]
if job.Status.Succeeded > 0 || job.Status.Failed > 0 {
return
}
case <-ctx.Done():
return
}
}
}
Expand Down

0 comments on commit c0afab1

Please sign in to comment.