From 7b50053b3c6fc632164dab9494123f6bdddee220 Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Fri, 6 Oct 2023 22:07:04 +0300 Subject: [PATCH] fix: replace deprecated method --- pkg/executor/client/job.go | 6 ++--- pkg/executor/common.go | 8 +++---- .../containerexecutor/containerexecutor.go | 8 +++---- pkg/executor/containerexecutor/logs.go | 2 +- pkg/k8sclient/k8sclient.go | 24 +++++++++---------- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/executor/client/job.go b/pkg/executor/client/job.go index 7aecf04b2c1..78eeb84d583 100644 --- a/pkg/executor/client/job.go +++ b/pkg/executor/client/job.go @@ -312,13 +312,13 @@ func (c *JobExecutor) updateResultsFromPod(ctx context.Context, pod corev1.Pod, }() // wait for pod to be loggable - if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.ClientSet, pod.Name, c.Namespace)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.ClientSet, pod.Name, c.Namespace)); err != nil { l.Errorw("waiting for pod started error", "error", err) } l.Debug("poll immediate waiting for pod") // wait for pod - if err = wait.PollImmediate(pollInterval, pollTimeout, executor.IsPodReady(ctx, c.ClientSet, pod.Name, c.Namespace)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, executor.IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil { // continue on poll err and try to get logs later l.Errorw("waiting for pod complete error", "error", err) } @@ -543,7 +543,7 @@ func (c *JobExecutor) TailJobLogs(ctx context.Context, id string, logs chan []by default: l.Debugw("tailing job logs: waiting for pod to be ready") - if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.ClientSet, pod.Name, c.Namespace)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.ClientSet, pod.Name, c.Namespace)); err != nil { l.Errorw("poll immediate error when tailing logs", "error", err) return err } diff --git a/pkg/executor/common.go b/pkg/executor/common.go index 7e1155e2c58..7e03da8cacc 100644 --- a/pkg/executor/common.go +++ b/pkg/executor/common.go @@ -158,8 +158,8 @@ type Images struct { } // IsPodReady defines if pod is ready or failed for logs scrapping -func IsPodReady(ctx context.Context, c kubernetes.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { +func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { return false, err @@ -178,8 +178,8 @@ func IsPodReady(ctx context.Context, c kubernetes.Interface, podName, namespace } // IsPodLoggable defines if pod is ready to get logs from it -func IsPodLoggable(ctx context.Context, c kubernetes.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { +func IsPodLoggable(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { return false, err diff --git a/pkg/executor/containerexecutor/containerexecutor.go b/pkg/executor/containerexecutor/containerexecutor.go index 48133f74d80..fab47e81e53 100644 --- a/pkg/executor/containerexecutor/containerexecutor.go +++ b/pkg/executor/containerexecutor/containerexecutor.go @@ -307,9 +307,9 @@ func (c *ContainerExecutor) updateResultsFromPod( // wait for pod l.Debug("poll immediate waiting for executor pod") - if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.clientSet, executorPod.Name, c.namespace)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.clientSet, executorPod.Name, c.namespace)); err != nil { l.Errorw("waiting for executor pod started error", "error", err) - } else if err = wait.PollImmediate(pollInterval, pollTimeout, executor.IsPodReady(ctx, c.clientSet, executorPod.Name, c.namespace)); err != nil { + } else if err = wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, executor.IsPodReady(c.clientSet, executorPod.Name, c.namespace)); err != nil { // continue on poll err and try to get logs later l.Errorw("waiting for executor pod complete error", "error", err) } @@ -350,9 +350,9 @@ func (c *ContainerExecutor) updateResultsFromPod( for _, scraperPod := range scraperPods.Items { if scraperPod.Status.Phase != corev1.PodRunning && scraperPod.Labels["job-name"] == scraperPodName { l.Debug("poll immediate waiting for scraper pod to succeed") - if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.clientSet, scraperPod.Name, c.namespace)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.clientSet, scraperPod.Name, c.namespace)); err != nil { l.Errorw("waiting for scraper pod started error", "error", err) - } else if err = wait.PollImmediate(pollInterval, pollTimeout, executor.IsPodReady(ctx, c.clientSet, scraperPod.Name, c.namespace)); err != nil { + } else if err = wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, executor.IsPodReady(c.clientSet, scraperPod.Name, c.namespace)); err != nil { // continue on poll err and try to get logs later l.Errorw("waiting for scraper pod complete error", "error", err) } diff --git a/pkg/executor/containerexecutor/logs.go b/pkg/executor/containerexecutor/logs.go index eabc88834c1..beb482c45a8 100644 --- a/pkg/executor/containerexecutor/logs.go +++ b/pkg/executor/containerexecutor/logs.go @@ -43,7 +43,7 @@ func (c *ContainerExecutor) TailJobLogs(ctx context.Context, id string, logs cha default: l.Debugw("tailing job logs: waiting for pod to be ready") - if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.clientSet, pod.Name, c.namespace)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.clientSet, pod.Name, c.namespace)); err != nil { l.Errorw("poll immediate error when tailing logs", "error", err) return err } diff --git a/pkg/k8sclient/k8sclient.go b/pkg/k8sclient/k8sclient.go index b13a0e26bfc..b47b506ec9d 100644 --- a/pkg/k8sclient/k8sclient.go +++ b/pkg/k8sclient/k8sclient.go @@ -126,8 +126,8 @@ func GetIngressAddress(clientSet kubernetes.Interface, ingressName string, names } // IsPersistentVolumeClaimBound TODO: add description. -func IsPersistentVolumeClaimBound(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { +func IsPersistentVolumeClaimBound(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { pv, err := c.CoreV1().PersistentVolumeClaims(namespace).Get(context.Background(), podName, metav1.GetOptions{}) if err != nil { return false, err @@ -144,8 +144,8 @@ func IsPersistentVolumeClaimBound(c kubernetes.Interface, podName, namespace str } // IsPodRunning check if the pod in question is running state -func IsPodRunning(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { +func IsPodRunning(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) if err != nil { return false, err @@ -162,8 +162,8 @@ func IsPodRunning(c kubernetes.Interface, podName, namespace string) wait.Condit } // HasPodSucceeded custom method for checing if Pod is succeded (handles PodFailed state too) -func HasPodSucceeded(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { +func HasPodSucceeded(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) if err != nil { return false, err @@ -180,8 +180,8 @@ func HasPodSucceeded(c kubernetes.Interface, podName, namespace string) wait.Con } // IsPodReady check if the pod in question is running state -func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { +func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) if err != nil { return false, nil @@ -201,17 +201,17 @@ func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.Conditio // WaitForPodsReady wait for pods to be running with a timeout, return error func WaitForPodsReady(k8sClient kubernetes.Interface, namespace string, instance string, timeout time.Duration) error { - - pods, err := k8sClient.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app.kubernetes.io/instance=" + instance}) + ctx := context.TODO() + pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: "app.kubernetes.io/instance=" + instance}) if err != nil { return err } for _, pod := range pods.Items { - if err := wait.PollImmediate(time.Second, timeout, IsPodRunning(k8sClient, pod.Name, namespace)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, IsPodRunning(k8sClient, pod.Name, namespace)); err != nil { return err } - if err := wait.PollImmediate(time.Second, timeout, IsPodReady(k8sClient, pod.Name, namespace)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, IsPodReady(k8sClient, pod.Name, namespace)); err != nil { return err } }