Skip to content

Commit

Permalink
fix(aiven_kafka_schema): update schema version (#1609)
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov authored Feb 22, 2024
1 parent 562ce34 commit a88d149
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 99 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ nav_order: 1
<!-- TODO: uncomment when dragonfly is supported -->
<!-- - Dragonfly support -->

- Fix `aiven_kafka_schema` version update

## [4.14.0] - 2024-02-20

- Refactor deprecated `resource.StateChangeConf`/`resource.StateRefreshFunc` usage to their equivalent with `retry`
Expand Down
165 changes: 66 additions & 99 deletions internal/sdkprovider/service/kafkaschema/kafka_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"reflect"
"regexp"
"time"

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"golang.org/x/exp/slices"

"github.com/aiven/terraform-provider-aiven/internal/common"
"github.com/aiven/terraform-provider-aiven/internal/schemautil"
Expand Down Expand Up @@ -115,8 +117,8 @@ func normalizeJSONOrProtobufString(i any) string {
func ResourceKafkaSchema() *schema.Resource {
return &schema.Resource{
Description: "The Kafka Schema resource allows the creation and management of Aiven Kafka Schemas.",
CreateContext: resourceKafkaSchemaCreate,
UpdateContext: resourceKafkaSchemaUpdate,
CreateContext: resourceKafkaSchemaUpsert,
UpdateContext: resourceKafkaSchemaUpsert,
ReadContext: resourceKafkaSchemaRead,
DeleteContext: resourceKafkaSchemaDelete,
Importer: &schema.ResourceImporter{
Expand All @@ -129,95 +131,15 @@ func ResourceKafkaSchema() *schema.Resource {
}
}

func kafkaSchemaSubjectGetLastVersion(
ctx context.Context,
m interface{},
project string,
serviceName string,
subjectName string,
) (int, error) {
client := m.(*aiven.Client)

r, err := client.KafkaSubjectSchemas.GetVersions(ctx, project, serviceName, subjectName)
if err != nil {
return 0, err
}

var latestVersion int
for _, v := range r.Versions {
if v > latestVersion {
latestVersion = v
}
}

return latestVersion, nil
}

// Aiven Kafka schema creates a new Kafka Schema Subject with a new version, and if Kafka
// Schema subject with a given name already exists API will validate new Kafka Schema
// configuration against the previous version for compatibility and if compatible will
// create a new version for the same Kafka Schema Subject
func resourceKafkaSchemaCreate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
func resourceKafkaSchemaUpsert(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
project := d.Get("project").(string)
serviceName := d.Get("service_name").(string)
subjectName := d.Get("subject_name").(string)

client := m.(*aiven.Client)

// create Kafka Schema Subject
_, err := client.KafkaSubjectSchemas.Add(
ctx,
project,
serviceName,
subjectName,
aiven.KafkaSchemaSubject{
Schema: d.Get("schema").(string),
SchemaType: d.Get("schema_type").(string),
},
)
if err != nil {
return diag.Errorf("unable to create schema: %s", err)
}

// set compatibility level if defined for a newly created Kafka Schema Subject
if compatibility, ok := d.GetOk("compatibility_level"); ok {
_, err := client.KafkaSubjectSchemas.UpdateConfiguration(
ctx,
project,
serviceName,
subjectName,
compatibility.(string),
)
if err != nil {
return diag.Errorf("unable to update configuration: %s", err)
}
}

version, err := kafkaSchemaSubjectGetLastVersion(ctx, m, project, serviceName, subjectName)
if err != nil {
return diag.Errorf("unable to get last version: %s", err)
}

// newly created versions start from 1
if version == 0 {
return diag.Errorf("kafka schema subject after creation has an empty list of versions")
}

d.SetId(schemautil.BuildResourceID(project, serviceName, subjectName))

return resourceKafkaSchemaRead(ctx, d, m)
}

func resourceKafkaSchemaUpdate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
project, serviceName, subjectName, err := schemautil.SplitResourceID3(d.Id())
if err != nil {
return diag.FromErr(err)
}

client := m.(*aiven.Client)

if d.HasChange("schema") {
_, err := client.KafkaSubjectSchemas.Add(
// This call returns Schema ID, not its version
s, err := client.KafkaSubjectSchemas.Add(
ctx,
project,
serviceName,
Expand All @@ -227,43 +149,94 @@ func resourceKafkaSchemaUpdate(ctx context.Context, d *schema.ResourceData, m in
SchemaType: d.Get("schema_type").(string),
},
)

if err != nil {
return diag.Errorf("unable to update schema: %s", err)
return diag.Errorf("unable to add schema: %s", err)
}

// Gets Schema's version by its ID
version, err := getSchemaVersion(ctx, client, project, serviceName, subjectName, s.Id)
if err != nil {
return diag.Errorf("unable to get schema version: %s", err)
}

if err := d.Set("version", version); err != nil {
return diag.FromErr(err)
}
}

// if compatibility_level has changed and the new value is not empty
if compatibility, ok := d.GetOk("compatibility_level"); ok {
_, err = client.KafkaSubjectSchemas.UpdateConfiguration(
_, err := client.KafkaSubjectSchemas.UpdateConfiguration(
ctx,
project,
serviceName,
subjectName,
compatibility.(string))
compatibility.(string),
)

if err != nil {
return diag.Errorf("unable to update configuration: %s", err)
}
}

d.SetId(schemautil.BuildResourceID(project, serviceName, subjectName))
return resourceKafkaSchemaRead(ctx, d, m)
}

// getSchemaVersion polls until the version with given Schema ID appears in the version list
func getSchemaVersion(ctx context.Context, client *aiven.Client, project, serviceName, subjectName string, id int) (int, error) {
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-time.After(time.Second):
versions, err := client.KafkaSubjectSchemas.GetVersions(ctx, project, serviceName, subjectName)
if err != nil {
return 0, err
}

for _, v := range versions.Versions {
s, err := client.KafkaSubjectSchemas.Get(ctx, project, serviceName, subjectName, v)
if err != nil {
return 0, err
}

if s.Version.Id == id {
return s.Version.Version, nil
}
}
}
}
}

func resourceKafkaSchemaRead(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
project, serviceName, subjectName, err := schemautil.SplitResourceID3(d.Id())
if err != nil {
return diag.FromErr(err)
}

client := m.(*aiven.Client)
version := d.Get("version").(int)
if version == 0 {
// For data source type and "import"
r, err := client.KafkaSubjectSchemas.GetVersions(ctx, project, serviceName, subjectName)
if err != nil {
return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d))
}
version = slices.Max(r.Versions)
if err := d.Set("version", version); err != nil {
return diag.FromErr(err)
}
}

version, err := kafkaSchemaSubjectGetLastVersion(ctx, m, project, serviceName, subjectName)
s, err := client.KafkaSubjectSchemas.Get(ctx, project, serviceName, subjectName, version)
if err != nil {
return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d))
return diag.FromErr(err)
}

r, err := client.KafkaSubjectSchemas.Get(ctx, project, serviceName, subjectName, version)
if err != nil {
return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d))
if err := d.Set("schema", s.Version.Schema); err != nil {
return diag.FromErr(err)
}

if err := d.Set("project", project); err != nil {
Expand All @@ -275,12 +248,6 @@ func resourceKafkaSchemaRead(ctx context.Context, d *schema.ResourceData, m inte
if err := d.Set("subject_name", subjectName); err != nil {
return diag.FromErr(err)
}
if err := d.Set("version", version); err != nil {
return diag.FromErr(err)
}
if err := d.Set("schema", r.Version.Schema); err != nil {
return diag.FromErr(err)
}

c, err := client.KafkaSubjectSchemas.GetConfiguration(ctx, project, serviceName, subjectName)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions internal/sdkprovider/service/kafkaschema/kafka_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ func TestAccAivenKafkaSchema_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "schema_type", "AVRO"),
),
},
// Reverts changes and gets version=1
{
Config: testAccKafkaSchemaResource(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema"),
resource.TestCheckResourceAttr(resourceName, "version", "1"),
resource.TestCheckResourceAttr(resourceName, "schema_type", "AVRO"),
),
},
{
Config: testAccKafkaSchemaResourceInvalidUpdate(rName),
ExpectError: regexp.MustCompile("schema is not compatible with previous version"),
Expand Down

0 comments on commit a88d149

Please sign in to comment.