diff --git a/CHANGELOG.md b/CHANGELOG.md index 5afb975a8..f46707a5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ nav_order: 1 +- Fix `aiven_kafka_schema` version update + ## [4.14.0] - 2024-02-20 - Refactor deprecated `resource.StateChangeConf`/`resource.StateRefreshFunc` usage to their equivalent with `retry` diff --git a/internal/sdkprovider/service/kafkaschema/kafka_schema.go b/internal/sdkprovider/service/kafkaschema/kafka_schema.go index 8c13524fa..ee36f35e4 100644 --- a/internal/sdkprovider/service/kafkaschema/kafka_schema.go +++ b/internal/sdkprovider/service/kafkaschema/kafka_schema.go @@ -6,12 +6,14 @@ import ( "fmt" "reflect" "regexp" + "time" "github.com/aiven/aiven-go-client/v2" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + "golang.org/x/exp/slices" "github.com/aiven/terraform-provider-aiven/internal/common" "github.com/aiven/terraform-provider-aiven/internal/schemautil" @@ -115,8 +117,8 @@ func normalizeJSONOrProtobufString(i any) string { func ResourceKafkaSchema() *schema.Resource { return &schema.Resource{ Description: "The Kafka Schema resource allows the creation and management of Aiven Kafka Schemas.", - CreateContext: resourceKafkaSchemaCreate, - UpdateContext: resourceKafkaSchemaUpdate, + CreateContext: resourceKafkaSchemaUpsert, + UpdateContext: resourceKafkaSchemaUpsert, ReadContext: resourceKafkaSchemaRead, DeleteContext: resourceKafkaSchemaDelete, Importer: &schema.ResourceImporter{ @@ -129,95 +131,15 @@ func ResourceKafkaSchema() *schema.Resource { } } -func kafkaSchemaSubjectGetLastVersion( - ctx context.Context, - m interface{}, - project string, - serviceName string, - subjectName string, -) (int, error) { - client := m.(*aiven.Client) - - r, err := client.KafkaSubjectSchemas.GetVersions(ctx, project, serviceName, subjectName) - if err != nil { - return 0, err - } - - var latestVersion int - for _, v := range r.Versions { - if v > latestVersion { - latestVersion = v - } - } - - return latestVersion, nil -} - -// Aiven Kafka schema creates a new Kafka Schema Subject with a new version, and if Kafka -// Schema subject with a given name already exists API will validate new Kafka Schema -// configuration against the previous version for compatibility and if compatible will -// create a new version for the same Kafka Schema Subject -func resourceKafkaSchemaCreate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { +func resourceKafkaSchemaUpsert(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { project := d.Get("project").(string) serviceName := d.Get("service_name").(string) subjectName := d.Get("subject_name").(string) client := m.(*aiven.Client) - - // create Kafka Schema Subject - _, err := client.KafkaSubjectSchemas.Add( - ctx, - project, - serviceName, - subjectName, - aiven.KafkaSchemaSubject{ - Schema: d.Get("schema").(string), - SchemaType: d.Get("schema_type").(string), - }, - ) - if err != nil { - return diag.Errorf("unable to create schema: %s", err) - } - - // set compatibility level if defined for a newly created Kafka Schema Subject - if compatibility, ok := d.GetOk("compatibility_level"); ok { - _, err := client.KafkaSubjectSchemas.UpdateConfiguration( - ctx, - project, - serviceName, - subjectName, - compatibility.(string), - ) - if err != nil { - return diag.Errorf("unable to update configuration: %s", err) - } - } - - version, err := kafkaSchemaSubjectGetLastVersion(ctx, m, project, serviceName, subjectName) - if err != nil { - return diag.Errorf("unable to get last version: %s", err) - } - - // newly created versions start from 1 - if version == 0 { - return diag.Errorf("kafka schema subject after creation has an empty list of versions") - } - - d.SetId(schemautil.BuildResourceID(project, serviceName, subjectName)) - - return resourceKafkaSchemaRead(ctx, d, m) -} - -func resourceKafkaSchemaUpdate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { - project, serviceName, subjectName, err := schemautil.SplitResourceID3(d.Id()) - if err != nil { - return diag.FromErr(err) - } - - client := m.(*aiven.Client) - if d.HasChange("schema") { - _, err := client.KafkaSubjectSchemas.Add( + // This call returns Schema ID, not its version + s, err := client.KafkaSubjectSchemas.Add( ctx, project, serviceName, @@ -227,27 +149,67 @@ func resourceKafkaSchemaUpdate(ctx context.Context, d *schema.ResourceData, m in SchemaType: d.Get("schema_type").(string), }, ) + if err != nil { - return diag.Errorf("unable to update schema: %s", err) + return diag.Errorf("unable to add schema: %s", err) + } + + // Gets Schema's version by its ID + version, err := getSchemaVersion(ctx, client, project, serviceName, subjectName, s.Id) + if err != nil { + return diag.Errorf("unable to get schema version: %s", err) + } + + if err := d.Set("version", version); err != nil { + return diag.FromErr(err) } } // if compatibility_level has changed and the new value is not empty if compatibility, ok := d.GetOk("compatibility_level"); ok { - _, err = client.KafkaSubjectSchemas.UpdateConfiguration( + _, err := client.KafkaSubjectSchemas.UpdateConfiguration( ctx, project, serviceName, subjectName, - compatibility.(string)) + compatibility.(string), + ) + if err != nil { return diag.Errorf("unable to update configuration: %s", err) } } + d.SetId(schemautil.BuildResourceID(project, serviceName, subjectName)) return resourceKafkaSchemaRead(ctx, d, m) } +// getSchemaVersion polls until the version with given Schema ID appears in the version list +func getSchemaVersion(ctx context.Context, client *aiven.Client, project, serviceName, subjectName string, id int) (int, error) { + for { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-time.After(time.Second): + versions, err := client.KafkaSubjectSchemas.GetVersions(ctx, project, serviceName, subjectName) + if err != nil { + return 0, err + } + + for _, v := range versions.Versions { + s, err := client.KafkaSubjectSchemas.Get(ctx, project, serviceName, subjectName, v) + if err != nil { + return 0, err + } + + if s.Version.Id == id { + return s.Version.Version, nil + } + } + } + } +} + func resourceKafkaSchemaRead(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { project, serviceName, subjectName, err := schemautil.SplitResourceID3(d.Id()) if err != nil { @@ -255,15 +217,26 @@ func resourceKafkaSchemaRead(ctx context.Context, d *schema.ResourceData, m inte } client := m.(*aiven.Client) + version := d.Get("version").(int) + if version == 0 { + // For data source type and "import" + r, err := client.KafkaSubjectSchemas.GetVersions(ctx, project, serviceName, subjectName) + if err != nil { + return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d)) + } + version = slices.Max(r.Versions) + if err := d.Set("version", version); err != nil { + return diag.FromErr(err) + } + } - version, err := kafkaSchemaSubjectGetLastVersion(ctx, m, project, serviceName, subjectName) + s, err := client.KafkaSubjectSchemas.Get(ctx, project, serviceName, subjectName, version) if err != nil { - return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d)) + return diag.FromErr(err) } - r, err := client.KafkaSubjectSchemas.Get(ctx, project, serviceName, subjectName, version) - if err != nil { - return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d)) + if err := d.Set("schema", s.Version.Schema); err != nil { + return diag.FromErr(err) } if err := d.Set("project", project); err != nil { @@ -275,12 +248,6 @@ func resourceKafkaSchemaRead(ctx context.Context, d *schema.ResourceData, m inte if err := d.Set("subject_name", subjectName); err != nil { return diag.FromErr(err) } - if err := d.Set("version", version); err != nil { - return diag.FromErr(err) - } - if err := d.Set("schema", r.Version.Schema); err != nil { - return diag.FromErr(err) - } c, err := client.KafkaSubjectSchemas.GetConfiguration(ctx, project, serviceName, subjectName) if err != nil { diff --git a/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go b/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go index 46d3fbf0b..930e5c45f 100644 --- a/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go +++ b/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go @@ -165,6 +165,15 @@ func TestAccAivenKafkaSchema_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "schema_type", "AVRO"), ), }, + // Reverts changes and gets version=1 + { + Config: testAccKafkaSchemaResource(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema"), + resource.TestCheckResourceAttr(resourceName, "version", "1"), + resource.TestCheckResourceAttr(resourceName, "schema_type", "AVRO"), + ), + }, { Config: testAccKafkaSchemaResourceInvalidUpdate(rName), ExpectError: regexp.MustCompile("schema is not compatible with previous version"),