Skip to content

Commit

Permalink
fix: replace deprecated method
Browse files Browse the repository at this point in the history
  • Loading branch information
vsukhin committed Oct 6, 2023
1 parent fc6f4a6 commit 7b50053
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/executor/client/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func (c *JobExecutor) updateResultsFromPod(ctx context.Context, pod corev1.Pod,
}()

// wait for pod to be loggable
if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.ClientSet, pod.Name, c.Namespace)); err != nil {
if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.ClientSet, pod.Name, c.Namespace)); err != nil {
l.Errorw("waiting for pod started error", "error", err)
}

l.Debug("poll immediate waiting for pod")
// wait for pod
if err = wait.PollImmediate(pollInterval, pollTimeout, executor.IsPodReady(ctx, c.ClientSet, pod.Name, c.Namespace)); err != nil {
if err = wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, executor.IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil {
// continue on poll err and try to get logs later
l.Errorw("waiting for pod complete error", "error", err)
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func (c *JobExecutor) TailJobLogs(ctx context.Context, id string, logs chan []by

default:
l.Debugw("tailing job logs: waiting for pod to be ready")
if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.ClientSet, pod.Name, c.Namespace)); err != nil {
if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.ClientSet, pod.Name, c.Namespace)); err != nil {
l.Errorw("poll immediate error when tailing logs", "error", err)
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ type Images struct {
}

// IsPodReady defines if pod is ready or failed for logs scrapping
func IsPodReady(ctx context.Context, c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return false, err
Expand All @@ -178,8 +178,8 @@ func IsPodReady(ctx context.Context, c kubernetes.Interface, podName, namespace
}

// IsPodLoggable defines if pod is ready to get logs from it
func IsPodLoggable(ctx context.Context, c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
func IsPodLoggable(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return false, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/containerexecutor/containerexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ func (c *ContainerExecutor) updateResultsFromPod(

// wait for pod
l.Debug("poll immediate waiting for executor pod")
if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.clientSet, executorPod.Name, c.namespace)); err != nil {
if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.clientSet, executorPod.Name, c.namespace)); err != nil {
l.Errorw("waiting for executor pod started error", "error", err)
} else if err = wait.PollImmediate(pollInterval, pollTimeout, executor.IsPodReady(ctx, c.clientSet, executorPod.Name, c.namespace)); err != nil {
} else if err = wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, executor.IsPodReady(c.clientSet, executorPod.Name, c.namespace)); err != nil {
// continue on poll err and try to get logs later
l.Errorw("waiting for executor pod complete error", "error", err)
}
Expand Down Expand Up @@ -350,9 +350,9 @@ func (c *ContainerExecutor) updateResultsFromPod(
for _, scraperPod := range scraperPods.Items {
if scraperPod.Status.Phase != corev1.PodRunning && scraperPod.Labels["job-name"] == scraperPodName {
l.Debug("poll immediate waiting for scraper pod to succeed")
if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.clientSet, scraperPod.Name, c.namespace)); err != nil {
if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.clientSet, scraperPod.Name, c.namespace)); err != nil {
l.Errorw("waiting for scraper pod started error", "error", err)
} else if err = wait.PollImmediate(pollInterval, pollTimeout, executor.IsPodReady(ctx, c.clientSet, scraperPod.Name, c.namespace)); err != nil {
} else if err = wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, executor.IsPodReady(c.clientSet, scraperPod.Name, c.namespace)); err != nil {
// continue on poll err and try to get logs later
l.Errorw("waiting for scraper pod complete error", "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/containerexecutor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *ContainerExecutor) TailJobLogs(ctx context.Context, id string, logs cha

default:
l.Debugw("tailing job logs: waiting for pod to be ready")
if err = wait.PollImmediate(pollInterval, c.podStartTimeout, executor.IsPodLoggable(ctx, c.clientSet, pod.Name, c.namespace)); err != nil {
if err = wait.PollUntilContextTimeout(ctx, pollInterval, c.podStartTimeout, true, executor.IsPodLoggable(c.clientSet, pod.Name, c.namespace)); err != nil {
l.Errorw("poll immediate error when tailing logs", "error", err)
return err
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func GetIngressAddress(clientSet kubernetes.Interface, ingressName string, names
}

// IsPersistentVolumeClaimBound TODO: add description.
func IsPersistentVolumeClaimBound(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
func IsPersistentVolumeClaimBound(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
pv, err := c.CoreV1().PersistentVolumeClaims(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
Expand All @@ -144,8 +144,8 @@ func IsPersistentVolumeClaimBound(c kubernetes.Interface, podName, namespace str
}

// IsPodRunning check if the pod in question is running state
func IsPodRunning(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
func IsPodRunning(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
Expand All @@ -162,8 +162,8 @@ func IsPodRunning(c kubernetes.Interface, podName, namespace string) wait.Condit
}

// HasPodSucceeded custom method for checing if Pod is succeded (handles PodFailed state too)
func HasPodSucceeded(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
func HasPodSucceeded(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
Expand All @@ -180,8 +180,8 @@ func HasPodSucceeded(c kubernetes.Interface, podName, namespace string) wait.Con
}

// IsPodReady check if the pod in question is running state
func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, nil
Expand All @@ -201,17 +201,17 @@ func IsPodReady(c kubernetes.Interface, podName, namespace string) wait.Conditio

// WaitForPodsReady wait for pods to be running with a timeout, return error
func WaitForPodsReady(k8sClient kubernetes.Interface, namespace string, instance string, timeout time.Duration) error {

pods, err := k8sClient.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app.kubernetes.io/instance=" + instance})
ctx := context.TODO()
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: "app.kubernetes.io/instance=" + instance})
if err != nil {
return err
}

for _, pod := range pods.Items {
if err := wait.PollImmediate(time.Second, timeout, IsPodRunning(k8sClient, pod.Name, namespace)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, IsPodRunning(k8sClient, pod.Name, namespace)); err != nil {
return err
}
if err := wait.PollImmediate(time.Second, timeout, IsPodReady(k8sClient, pod.Name, namespace)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, IsPodReady(k8sClient, pod.Name, namespace)); err != nil {
return err
}
}
Expand Down

0 comments on commit 7b50053

Please sign in to comment.