Skip to content

Commit

Permalink
feat(kafka_quota): added support for kafka quota
Browse files Browse the repository at this point in the history
  • Loading branch information
vmyroslav committed Dec 19, 2024
1 parent 8a144fe commit 23135e2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 65 deletions.
54 changes: 27 additions & 27 deletions internal/acctest/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestCompositionBuilder_DataSourceAndResource(t *testing.T) {
name = "{{ .name }}"
}`)
registry.MustAddTemplate(t, "project", `resource "aiven_project" "example_project" {
project = "{{ .name }}"
project = "{{ .name }}"
organization_id = data.aiven_organization.main.id
}`)

Expand All @@ -174,7 +174,7 @@ func TestCompositionBuilder_DataSourceAndResource(t *testing.T) {
name = "example-org"
}
resource "aiven_project" "example_project" {
project = "example-project"
project = "example-project"
organization_id = data.aiven_organization.main.id
}`

Expand All @@ -198,23 +198,23 @@ func TestCompositionBuilder(t *testing.T) {
templates: map[string]string{
"kafka": `resource "aiven_kafka" "example_kafka" {
project = data.aiven_project.example_project.project
cloud_name = "{{ .cloud_name }}"
plan = "{{ .plan }}"
service_name = "{{ .service_name }}"
maintenance_window_dow = "{{ .maintenance_window_dow }}"
cloud_name = "{{ .cloud_name }}"
plan = "{{ .plan }}"
service_name = "{{ .service_name }}"
maintenance_window_dow = "{{ .maintenance_window_dow }}"
maintenance_window_time = "{{ .maintenance_window_time }}"
kafka_user_config {
kafka_rest = {{ .kafka_rest }}
kafka_connect = {{ .kafka_connect }}
schema_registry = {{ .schema_registry }}
kafka_rest = "{{ .kafka_rest }}"
kafka_connect = "{{ .kafka_connect }}"
schema_registry = "{{ .schema_registry }}"
kafka_version = "{{ .kafka_version }}"
kafka {
group_max_session_timeout_ms = {{ .group_max_session_timeout_ms }}
log_retention_bytes = {{ .log_retention_bytes }}
group_max_session_timeout_ms = "{{ .group_max_session_timeout_ms }}"
log_retention_bytes = "{{ .log_retention_bytes }}"
}
public_access {
kafka_rest = {{ .kafka_rest_public }}
kafka_connect = {{ .kafka_connect_public }}
kafka_rest = "{{ .kafka_rest_public }}"
kafka_connect = "{{ .kafka_connect_public }}"
}
}
}`,
Expand Down Expand Up @@ -269,23 +269,23 @@ func TestCompositionBuilder(t *testing.T) {
}
resource "aiven_kafka" "example_kafka" {
project = data.aiven_project.example_project.project
cloud_name = "google-europe-west1"
plan = "business-4"
service_name = "example-kafka"
maintenance_window_dow = "monday"
cloud_name = "google-europe-west1"
plan = "business-4"
service_name = "example-kafka"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"
kafka_user_config {
kafka_rest = true
kafka_connect = true
schema_registry = true
kafka_rest = "true"
kafka_connect = "true"
schema_registry = "true"
kafka_version = "3.5"
kafka {
group_max_session_timeout_ms = 70000
log_retention_bytes = 1000000000
group_max_session_timeout_ms = "70000"
log_retention_bytes = "1000000000"
}
public_access {
kafka_rest = true
kafka_connect = true
kafka_rest = "true"
kafka_connect = "true"
}
}
}
Expand All @@ -306,9 +306,9 @@ resource "aiven_kafka_user" "example_service_user" {
plan = "{{ .plan }}"
}`,
"kafka_config": ` kafka_user_config {
kafka_rest = {{ .kafka_rest }}
schema_registry = {{ .schema_registry }}
}`,
kafka_rest = {{ .kafka_rest }}
schema_registry = {{ .schema_registry }}
}`,
},
compositions: []struct {
templateKey string
Expand Down
20 changes: 4 additions & 16 deletions internal/sdkprovider/service/kafka/kafka_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ Quotas can be applied based on user, client-id, or both.
The most relevant quota is chosen for each connection.
All connections within a quota group share the same quota.
It is possible to set default quotas for each (user, client-id), user or client-id group by specifying 'default'`,
ValidateFunc: validation.All(
validation.StringLenBetween(1, 255),
validation.StringIsNotEmpty,
),
ValidateFunc: validation.StringLenBetween(1, 255),
},
"consumer_byte_rate": {
Type: schema.TypeInt,
Expand All @@ -51,10 +48,7 @@ It is possible to set default quotas for each (user, client-id), user or client-
Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota.
Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis.
Exceeding this limit results in client throttling.`,
ValidateFunc: validation.All(
validation.IntAtLeast(0),
validation.IntAtMost(1073741824),
),
ValidateFunc: validation.IntBetween(0, 1073741824),
},
"producer_byte_rate": {
Type: schema.TypeInt,
Expand All @@ -64,10 +58,7 @@ Exceeding this limit results in client throttling.`,
Defines the bandwidth limit in bytes/sec for each group of clients sharing a quota.
Every distinct client group is allocated a specific quota, as defined by the cluster, on a per-broker basis.
Exceeding this limit results in client throttling.`,
ValidateFunc: validation.All(
validation.IntAtLeast(0),
validation.IntAtMost(1073741824),
),
ValidateFunc: validation.IntBetween(0, 1073741824),
},
"request_percentage": {
Type: schema.TypeInt,
Expand All @@ -77,10 +68,7 @@ Exceeding this limit results in client throttling.`,
Sets the maximum percentage of CPU time that a client group can use on request handler I/O and network threads per broker within a quota window.
Exceeding this limit triggers throttling.
The quota, expressed as a percentage, also indicates the total allowable CPU usage for the client groups sharing the quota.`,
ValidateFunc: validation.All(
validation.IntAtLeast(0),
validation.IntAtMost(100),
),
ValidateFunc: validation.IntBetween(0, 100),
},
}

Expand Down
47 changes: 25 additions & 22 deletions internal/sdkprovider/service/kafka/kafka_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func TestAccAivenKafkaQuota(t *testing.T) {
randName = acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)
serviceName = fmt.Sprintf("test-acc-sr-%s", randName)
projectName = os.Getenv("AIVEN_PROJECT_NAME")

user = fmt.Sprintf("acc_test_user_%s", randName)
clientID = fmt.Sprintf("acc_test_client_%s", randName)
)

// Add templates
Expand All @@ -35,12 +38,12 @@ data "aiven_project" "foo" {

registry.MustAddTemplate(t, "aiven_kafka", `
resource "aiven_kafka" "bar" {
project = data.aiven_project.foo.project
cloud_name = "google-europe-west1"
plan = "startup-2"
service_name = "{{ .service_name }}"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"
project = data.aiven_project.foo.project
cloud_name = "google-europe-west1"
plan = "startup-2"
service_name = "{{ .service_name }}"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"
}`)

registry.MustAddTemplate(t, "kafka_quota_full", `
Expand Down Expand Up @@ -70,7 +73,7 @@ resource "aiven_kafka_quota" "{{ .resource_name }}" {
producer_byte_rate = {{ .producer_byte_rate }}
}`)

registry.MustAddTemplate(t, "invalid", `
registry.MustAddTemplate(t, "wrong_configuration", `
resource "aiven_kafka_quota" "{{ .resource_name }}" {
project = data.aiven_project.foo.project
service_name = aiven_kafka.bar.service_name
Expand Down Expand Up @@ -98,19 +101,19 @@ resource "aiven_kafka_quota" "{{ .resource_name }}" {
Add("kafka_quota_full", map[string]any{
"resource_name": "full",
"service_name": serviceName,
"user": fmt.Sprintf("acc_test_user_%s", randName),
"client_id": fmt.Sprintf("acc_test_client_%s", randName),
"user": user,
"client_id": clientID,
"consumer_byte_rate": 1000,
"producer_byte_rate": 1000,
"request_percentage": 101,
}).
MustRender(t),
ExpectError: regexp.MustCompile(`expected .+ to be at (?:most|least) \(\d+\), got -?\d+`),
ExpectError: regexp.MustCompile(`expected .+ to be in the range \(\d+ - \d+\), got \d+`),
},
{
Config: newComposition().
Add("invalid", map[string]any{
"resource_name": "invalid",
Add("wrong_configuration", map[string]any{
"resource_name": "full",
"service_name": serviceName,
"consumer_byte_rate": 1000,
"producer_byte_rate": 1000,
Expand All @@ -124,31 +127,31 @@ resource "aiven_kafka_quota" "{{ .resource_name }}" {
Add("kafka_quota_full", map[string]any{
"resource_name": "full",
"service_name": serviceName,
"user": fmt.Sprintf("acc_test_user_%s", randName),
"client_id": fmt.Sprintf("acc_test_client_%s", randName),
"user": user,
"client_id": clientID,
"consumer_byte_rate": 1000,
"producer_byte_rate": 1000,
"request_percentage": 10,
}).
Add("kafka_quota_user", map[string]any{
"resource_name": "user",
"service_name": serviceName,
"user": fmt.Sprintf("acc_test_user_%s", randName),
"user": user,
"request_percentage": 20,
}).
MustRender(t),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "project", projectName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "service_name", serviceName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "user", fmt.Sprintf("acc_test_user_%s", randName)),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "client_id", fmt.Sprintf("acc_test_client_%s", randName)),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "user", user),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "client_id", clientID),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "consumer_byte_rate", "1000"),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "producer_byte_rate", "1000"),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.full", kafkaQuotaResource), "request_percentage", "10"),

resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "project", projectName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "service_name", serviceName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "user", fmt.Sprintf("acc_test_user_%s", randName)),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "user", user),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.user", kafkaQuotaResource), "request_percentage", "20"),
),
},
Expand All @@ -157,14 +160,14 @@ resource "aiven_kafka_quota" "{{ .resource_name }}" {
Add("kafka_quota_client_id", map[string]any{
"resource_name": "client",
"service_name": serviceName,
"client_id": fmt.Sprintf("acc_test_client_%s", randName),
"client_id": clientID,
"producer_byte_rate": 1000,
}).
MustRender(t),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "project", projectName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "service_name", serviceName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "client_id", fmt.Sprintf("acc_test_client_%s", randName)),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "client_id", clientID),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "producer_byte_rate", "1000"),
),
},
Expand All @@ -174,14 +177,14 @@ resource "aiven_kafka_quota" "{{ .resource_name }}" {
Add("kafka_quota_client_id", map[string]any{
"resource_name": "client",
"service_name": serviceName,
"client_id": fmt.Sprintf("acc_test_client_%s", randName),
"client_id": clientID,
"producer_byte_rate": 1000,
}).
MustRender(t),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "project", projectName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "service_name", serviceName),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "client_id", fmt.Sprintf("acc_test_client_%s", randName)),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "client_id", clientID),
resource.TestCheckResourceAttr(fmt.Sprintf("%s.client", kafkaQuotaResource), "producer_byte_rate", "1000"),
),
},
Expand Down

0 comments on commit 23135e2

Please sign in to comment.