From 74ec657b4be0cbd36fea15a83446d0f14fff20d4 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 3 Nov 2023 09:14:21 -0700 Subject: [PATCH] Add support for Kafka Topic User ACL management (#1056) * Kafka User Support * fix tests * update comments and refactor * fix docs * Update digitalocean/database/resource_database_user.go Co-authored-by: Andrew Starr-Bochicchio --------- Co-authored-by: Andrew Starr-Bochicchio --- .../database/datasource_database_user.go | 16 +++ .../database/resource_database_user.go | 110 +++++++++++++++++- .../database/resource_database_user_test.go | 84 +++++++++++++ docs/resources/database_user.md | 50 ++++++++ 4 files changed, 258 insertions(+), 2 deletions(-) diff --git a/digitalocean/database/datasource_database_user.go b/digitalocean/database/datasource_database_user.go index 4ab2da5ad..43b19eebd 100644 --- a/digitalocean/database/datasource_database_user.go +++ b/digitalocean/database/datasource_database_user.go @@ -36,6 +36,19 @@ func DataSourceDigitalOceanDatabaseUser() *schema.Resource { Type: schema.TypeString, Computed: true, }, + "settings": { + Type: schema.TypeList, + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "acl": { + Type: schema.TypeList, + Optional: true, + Elem: userACLSchema(), + }, + }, + }, + }, }, } } @@ -62,5 +75,8 @@ func dataSourceDigitalOceanDatabaseUserRead(ctx context.Context, d *schema.Resou d.Set("mysql_auth_plugin", user.MySQLSettings.AuthPlugin) } + if err := d.Set("settings", flattenUserSettings(user.Settings)); err != nil { + return diag.Errorf("Error setting user settings: %#v", err) + } return nil } diff --git a/digitalocean/database/resource_database_user.go b/digitalocean/database/resource_database_user.go index aecdaee97..0e26e417f 100644 --- a/digitalocean/database/resource_database_user.go +++ b/digitalocean/database/resource_database_user.go @@ -52,8 +52,19 @@ func ResourceDigitalOceanDatabaseUser() *schema.Resource { return old == godo.SQLAuthPluginCachingSHA2 && new == "" }, }, - - // Computed Properties + "settings": { + Type: schema.TypeList, + Optional: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "acl": { + Type: schema.TypeList, + Optional: true, + Elem: userACLSchema(), + }, + }, + }, + }, "role": { Type: schema.TypeString, Computed: true, @@ -67,6 +78,34 @@ func ResourceDigitalOceanDatabaseUser() *schema.Resource { } } +func userACLSchema() *schema.Resource { + return &schema.Resource{ + Schema: map[string]*schema.Schema{ + "id": { + Type: schema.TypeString, + Computed: true, + }, + "topic": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.NoZeroValues, + }, + "permission": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.StringInSlice([]string{ + "admin", + "consume", + "produce", + "produceconsume", + }, false), + }, + }, + } +} + func resourceDigitalOceanDatabaseUserCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { client := meta.(*config.CombinedConfig).GodoClient() clusterID := d.Get("cluster_id").(string) @@ -81,6 +120,10 @@ func resourceDigitalOceanDatabaseUserCreate(ctx context.Context, d *schema.Resou } } + if v, ok := d.GetOk("settings"); ok { + opts.Settings = expandUserSettings(v.([]interface{})) + } + // Prevent parallel creation of users for same cluster. key := fmt.Sprintf("digitalocean_database_cluster/%s/users", clusterID) mutexKV.Lock(key) @@ -95,6 +138,11 @@ func resourceDigitalOceanDatabaseUserCreate(ctx context.Context, d *schema.Resou d.SetId(makeDatabaseUserID(clusterID, user.Name)) log.Printf("[INFO] Database User Name: %s", user.Name) + // set userSettings only on CreateUser, due to CreateUser responses including `settings` but GetUser responses not including `settings` + if err := d.Set("settings", flattenUserSettings(user.Settings)); err != nil { + return diag.Errorf("Error setting user settings: %#v", err) + } + setDatabaseUserAttributes(d, user) return nil @@ -203,3 +251,61 @@ func resourceDigitalOceanDatabaseUserImport(d *schema.ResourceData, meta interfa func makeDatabaseUserID(clusterID string, name string) string { return fmt.Sprintf("%s/user/%s", clusterID, name) } + +func expandUserSettings(raw []interface{}) *godo.DatabaseUserSettings { + if len(raw) == 0 || raw[0] == nil { + return &godo.DatabaseUserSettings{} + } + userSettingsConfig := raw[0].(map[string]interface{}) + + userSettings := &godo.DatabaseUserSettings{ + ACL: expandUserACLs(userSettingsConfig["acl"].([]interface{})), + } + return userSettings +} + +func expandUserACLs(rawACLs []interface{}) []*godo.KafkaACL { + acls := make([]*godo.KafkaACL, 0, len(rawACLs)) + for _, rawACL := range rawACLs { + a := rawACL.(map[string]interface{}) + acl := &godo.KafkaACL{ + Topic: a["topic"].(string), + Permission: a["permission"].(string), + } + acls = append(acls, acl) + } + return acls +} + +func flattenUserSettings(settings *godo.DatabaseUserSettings) []map[string]interface{} { + result := make([]map[string]interface{}, 0, 1) + if settings != nil { + r := make(map[string]interface{}) + r["acl"] = flattenUserACLs(settings.ACL) + result = append(result, r) + } + return result +} + +func flattenUserACLs(acls []*godo.KafkaACL) []map[string]interface{} { + result := make([]map[string]interface{}, len(acls)) + for i, acl := range acls { + item := make(map[string]interface{}) + item["id"] = acl.ID + item["topic"] = acl.Topic + item["permission"] = normalizePermission(acl.Permission) + result[i] = item + } + return result +} + +func normalizePermission(p string) string { + pLower := strings.ToLower(p) + switch pLower { + case "admin", "produce", "consume": + return pLower + case "produceconsume", "produce_consume", "readwrite", "read_write": + return "produceconsume" + } + return "" +} diff --git a/digitalocean/database/resource_database_user_test.go b/digitalocean/database/resource_database_user_test.go index 460e3b92b..33fe5c551 100644 --- a/digitalocean/database/resource_database_user_test.go +++ b/digitalocean/database/resource_database_user_test.go @@ -177,6 +177,57 @@ func TestAccDigitalOceanDatabaseUser_MySQLAuth(t *testing.T) { }) } +func TestAccDigitalOceanDatabaseUser_KafkaACLs(t *testing.T) { + var databaseUser godo.DatabaseUser + databaseClusterName := acceptance.RandomTestName() + databaseUserName := acceptance.RandomTestName() + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acceptance.TestAccPreCheck(t) }, + ProviderFactories: acceptance.TestAccProviderFactories, + CheckDestroy: testAccCheckDigitalOceanDatabaseUserDestroy, + Steps: []resource.TestStep{ + { + Config: fmt.Sprintf(testAccCheckDigitalOceanDatabaseUserConfigKafkaACL, databaseClusterName, databaseUserName), + Check: resource.ComposeTestCheckFunc( + testAccCheckDigitalOceanDatabaseUserExists("digitalocean_database_user.foobar_user", &databaseUser), + testAccCheckDigitalOceanDatabaseUserAttributes(&databaseUser, databaseUserName), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "name", databaseUserName), + resource.TestCheckResourceAttrSet( + "digitalocean_database_user.foobar_user", "role"), + resource.TestCheckResourceAttrSet( + "digitalocean_database_user.foobar_user", "password"), + resource.TestCheckResourceAttrSet( + "digitalocean_database_user.foobar_user", "settings.0.acl.0.id"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.0.topic", "topic-1"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.0.permission", "admin"), + resource.TestCheckResourceAttrSet( + "digitalocean_database_user.foobar_user", "settings.0.acl.1.id"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.1.topic", "topic-2"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.1.permission", "produceconsume"), + resource.TestCheckResourceAttrSet( + "digitalocean_database_user.foobar_user", "settings.0.acl.2.id"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.2.topic", "topic-*"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.2.permission", "produce"), + resource.TestCheckResourceAttrSet( + "digitalocean_database_user.foobar_user", "settings.0.acl.3.id"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.3.topic", "topic-*"), + resource.TestCheckResourceAttr( + "digitalocean_database_user.foobar_user", "settings.0.acl.3.permission", "consume"), + ), + }, + }, + }) +} + func testAccCheckDigitalOceanDatabaseUserDestroy(s *terraform.State) error { client := acceptance.TestAccProvider.Meta().(*config.CombinedConfig).GodoClient() @@ -361,6 +412,39 @@ resource "digitalocean_database_user" "foobar_user" { mysql_auth_plugin = "mysql_native_password" }` +const testAccCheckDigitalOceanDatabaseUserConfigKafkaACL = ` +resource "digitalocean_database_cluster" "foobar" { + name = "%s" + engine = "kafka" + version = "3.5" + size = "db-s-1vcpu-2gb" + region = "nyc1" + node_count = 3 +} + +resource "digitalocean_database_user" "foobar_user" { + cluster_id = digitalocean_database_cluster.foobar.id + name = "%s" + settings { + acl { + topic = "topic-1" + permission = "admin" + } + acl { + topic = "topic-2" + permission = "produceconsume" + } + acl { + topic = "topic-*" + permission = "produce" + } + acl { + topic = "topic-*" + permission = "consume" + } + } +}` + const testAccCheckDigitalOceanDatabaseUserConfigMySQLAuthUpdate = ` resource "digitalocean_database_cluster" "foobar" { name = "%s" diff --git a/docs/resources/database_user.md b/docs/resources/database_user.md index e2db5719e..0e6a3998d 100644 --- a/docs/resources/database_user.md +++ b/docs/resources/database_user.md @@ -51,6 +51,42 @@ resource "digitalocean_database_user" "user-example" { } ``` +### Create a new user for a Kafka database cluster +```hcl +resource "digitalocean_database_cluster" "kafka-example" { + name = "example-kafka-cluster" + engine = "kafka" + version = "3.5" + size = "db-s-1vcpu-2gb" + region = "nyc1" + node_count = 3 +} + +resource "digitalocean_database_kafka_topic" "foobar_topic" { + cluster_id = digitalocean_database_cluster.foobar.id + name = "topic-1" +} + +resource "digitalocean_database_user" "foobar_user" { + cluster_id = digitalocean_database_cluster.foobar.id + name = "example-user" + settings { + acl { + topic = "topic-1" + permission = "produce" + } + acl { + topic = "topic-2" + permission = "produceconsume" + } + acl { + topic = "topic-*" + permission = "consume" + } + } +} +``` + ## Argument Reference The following arguments are supported: @@ -58,6 +94,17 @@ The following arguments are supported: * `cluster_id` - (Required) The ID of the original source database cluster. * `name` - (Required) The name for the database user. * `mysql_auth_plugin` - (Optional) The authentication method to use for connections to the MySQL user account. The valid values are `mysql_native_password` or `caching_sha2_password` (this is the default). +* `settings` - (Optional) Contains optional settings for the user. +The `settings` block is documented below. + +`settings` supports the following: + +* `acl` - (Optional) A set of ACLs (Access Control Lists) specifying permission on topics with a Kafka cluster. The properties of an individual ACL are described below: + +An individual ACL includes the following: + +* `topic` - (Required) A regex for matching the topic(s) that this ACL should apply to. +* `permission` - (Required) The permission level applied to the ACL. This includes "admin", "consume", "produce", and "produceconsume". "admin" allows for producing and consuming as well as add/delete/update permission for topics. "consume" allows only for reading topic messages. "produce" allows only for writing topic messages. "produceconsume" allows for both reading and writing topic messages. ## Attributes Reference @@ -66,6 +113,9 @@ In addition to the above arguments, the following attributes are exported: * `role` - Role for the database user. The value will be either "primary" or "normal". * `password` - Password for the database user. +For individual ACLs for Kafka topics, the following attributes are exported: +* `id` - An identifier for the ACL, this will be automatically assigned when you create an ACL entry + ## Import Database user can be imported using the `id` of the source database cluster