Skip to content

Commit

Permalink
fix(flink): poll application deployment status on creation
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Feb 19, 2024
1 parent 1bb4a14 commit 3484b70
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ nav_order: 1
- Fix `aiven_organization_user_group` resource - `description` field is required
- Use golang 1.22
- Output explicitly `termination_protection = true -> false` when service property is removed
- Fix `aiven_flink_application_deployment` deletion

## [4.13.3] - 2024-01-29

Expand Down
20 changes: 0 additions & 20 deletions internal/plugin/util/wait.go

This file was deleted.

55 changes: 32 additions & 23 deletions internal/sdkprovider/service/flink/flink_application_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flink

import (
"context"
"time"
"fmt"

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -119,6 +119,10 @@ func resourceFlinkApplicationDeploymentCreate(

d.SetId(schemautil.BuildResourceID(project, serviceName, applicationID, r.ID))

if err = waitStatusChange(ctx, client, d, "INITIALIZING"); err != nil {
return diag.Errorf("error waiting for Flink Application Deployment to become initialized: %s", err)
}

return resourceFlinkApplicationDeploymentRead(ctx, d, m)
}

Expand All @@ -140,28 +144,7 @@ func resourceFlinkApplicationDeploymentDelete(
return diag.Errorf("error cancelling Flink Application Deployment: %v", err)
}

//goland:noinspection GoDeprecation
conf := &retry.StateChangeConf{
Pending: []string{
"CANCELLING",
},
Target: []string{
"CANCELED",
},
Refresh: func() (interface{}, string, error) {
r, err := client.FlinkApplicationDeployments.Get(ctx, project, serviceName, applicationID, deploymentID)
if err != nil {
return nil, "", err
}
return r, r.Status, nil
},
Delay: 1 * time.Second,
Timeout: d.Timeout(schema.TimeoutDelete),
MinTimeout: 1 * time.Second,
}

_, err = conf.WaitForStateContext(ctx)
if err != nil {
if err = waitStatusChange(ctx, client, d, "CANCELLING"); err != nil {
return diag.Errorf("error waiting for Flink Application Deployment to become canceled: %s", err)
}

Expand All @@ -170,6 +153,10 @@ func resourceFlinkApplicationDeploymentDelete(
return diag.Errorf("error deleting Flink Application Deployment: %v", err)
}

if err = waitStatusChange(ctx, client, d, "DELETING"); !(err == nil || aiven.IsNotFound(err)) {
return diag.Errorf("error waiting for Flink Application Deployment to become deleted: %s", err)
}

return nil
}

Expand Down Expand Up @@ -225,3 +212,25 @@ func resourceFlinkApplicationDeploymentRead(ctx context.Context, d *schema.Resou

return nil
}

// waitStatusChange waits for given status to change.
// Flink Application Deployment state machine is huge, better not to implement it here.
func waitStatusChange(ctx context.Context, client *aiven.Client, d *schema.ResourceData, status string) error {
project, serviceName, applicationID, deploymentID, err := schemautil.SplitResourceID4(d.Id())
if err != nil {
return err
}

return retry.RetryContext(ctx, d.Timeout(schema.TimeoutCreate), func() *retry.RetryError {
deployment, err := client.FlinkApplicationDeployments.Get(ctx, project, serviceName, applicationID, deploymentID)
if err != nil {
// The client has retries under the hood for 50x and other cases.
return retry.NonRetryableError(err)
}

if status == deployment.FlinkApplicationDeployment.Status {
return retry.RetryableError(fmt.Errorf("expected %q status to change", status))
}
return nil
})
}

0 comments on commit 3484b70

Please sign in to comment.