Skip to content

Commit

Permalink
fix(flink): poll application deployment status on creation
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Feb 19, 2024
1 parent 1bb4a14 commit f778a25
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ nav_order: 1
- Fix `aiven_organization_user_group` resource - `description` field is required
- Use golang 1.22
- Output explicitly `termination_protection = true -> false` when service property is removed
- Fix `aiven_flink_application_deployment` deletion

## [4.13.3] - 2024-01-29

Expand Down
20 changes: 0 additions & 20 deletions internal/plugin/util/wait.go

This file was deleted.

58 changes: 24 additions & 34 deletions internal/sdkprovider/service/flink/flink_application_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"

Expand Down Expand Up @@ -135,42 +134,33 @@ func resourceFlinkApplicationDeploymentDelete(
return diag.Errorf("cannot read Flink Application Deployment resource ID: %v", err)
}

_, err = client.FlinkApplicationDeployments.Cancel(ctx, project, serviceName, applicationID, deploymentID)
if err != nil {
return diag.Errorf("error cancelling Flink Application Deployment: %v", err)
}

//goland:noinspection GoDeprecation
conf := &retry.StateChangeConf{
Pending: []string{
"CANCELLING",
},
Target: []string{
"CANCELED",
},
Refresh: func() (interface{}, string, error) {
r, err := client.FlinkApplicationDeployments.Get(ctx, project, serviceName, applicationID, deploymentID)
if err != nil {
return nil, "", err
// Flink Application Deployment has a quite complicated state machine
// https://api.aiven.io/doc/#tag/Service:_Flink/operation/ServiceFlinkDeleteApplicationDeployment
// Retries until succeeds or exceeds the timeout
var isCancelled, isDeleted bool
for {
select {
case <-ctx.Done():
// The context itself already comes with delete timeout
return diag.Errorf("can't delete Flink Application Deployment: %s", ctx.Err())
case <-time.After(time.Second):
if aiven.IsNotFound(err) {
// Early exit for any operation within the loop
return nil
}
return r, r.Status, nil
},
Delay: 1 * time.Second,
Timeout: d.Timeout(schema.TimeoutDelete),
MinTimeout: 1 * time.Second,
}

_, err = conf.WaitForStateContext(ctx)
if err != nil {
return diag.Errorf("error waiting for Flink Application Deployment to become canceled: %s", err)
}

_, err = client.FlinkApplicationDeployments.Delete(ctx, project, serviceName, applicationID, deploymentID)
if err != nil {
return diag.Errorf("error deleting Flink Application Deployment: %v", err)
switch {
case !isCancelled:
_, err = client.FlinkApplicationDeployments.Cancel(ctx, project, serviceName, applicationID, deploymentID)
isCancelled = err == nil
case !isDeleted:
_, err = client.FlinkApplicationDeployments.Delete(ctx, project, serviceName, applicationID, deploymentID)
isDeleted = err == nil
default:
_, err = client.FlinkApplicationDeployments.Get(ctx, project, serviceName, applicationID, deploymentID)
}
}
}

return nil
}

// resourceFlinkApplicationDeploymentRead reads an existing Flink Application Deployment resource.
Expand Down

0 comments on commit f778a25

Please sign in to comment.