Skip to content

Commit

Permalink
Add support for kafka endpoints with sasl options (#342)
Browse files Browse the repository at this point in the history
* Add support for kafka endpoints with sasl options

* Minor Kafka updates based on review feedback

* Use user instead of username for consistency
  • Loading branch information
kellymclaughlin authored Jan 13, 2021
1 parent 935066b commit 63ac643
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
45 changes: 43 additions & 2 deletions fastly/block_fastly_service_v1_logging_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (h *KafkaServiceAttributeHandler) Register(s *schema.Resource) error {
"required_acks": {
Type: schema.TypeString,
Optional: true,
Description: "The Number of acknowledgements blockAttributes leader must receive before blockAttributes write is considered successful. One of: 1 (default) One server needs to respond. 0 No servers need to respond. -1 Wait for all in-sync replicas to respond.",
Description: "The Number of acknowledgements a leader must receive before a write is considered successful. One of: 1 (default) One server needs to respond. 0 No servers need to respond. -1 Wait for all in-sync replicas to respond.",
},

"use_tls": {
Expand Down Expand Up @@ -92,7 +92,38 @@ func (h *KafkaServiceAttributeHandler) Register(s *schema.Resource) error {
"tls_hostname": {
Type: schema.TypeString,
Optional: true,
Description: "The hostname used to verify the server's certificate. It can either be the Common Name or blockAttributes Subject Alternative Name (SAN).",
Description: "The hostname used to verify the server's certificate. It can either be the Common Name or a Subject Alternative Name (SAN).",
},

"parse_log_keyvals": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: "Enables parsing of key=value tuples from the beginning of a logline, turning them into record headers.",
},

"request_max_bytes": {
Type: schema.TypeInt,
Optional: true,
Description: "Maximum size of log batch, if non-zero. Defaults to 0 for unbounded.",
},

"auth_method": {
Type: schema.TypeString,
Optional: true,
Description: "SASL authentication method. One of: plain, scram-sha-256, scram-sha-512.",
},

"user": {
Type: schema.TypeString,
Optional: true,
Description: "SASL User.",
},

"password": {
Type: schema.TypeString,
Optional: true,
Description: "SASL Pass.",
},
}

Expand Down Expand Up @@ -250,6 +281,11 @@ func flattenKafka(kafkaList []*gofastly.Kafka) []map[string]interface{} {
"format_version": s.FormatVersion,
"placement": s.Placement,
"response_condition": s.ResponseCondition,
"parse_log_keyvals": s.ParseLogKeyvals,
"request_max_bytes": s.RequestMaxBytes,
"auth_method": s.AuthMethod,
"user": s.User,
"password": s.Password,
}

// prune any empty values that come from the default string value in structs
Expand Down Expand Up @@ -286,6 +322,11 @@ func (h *KafkaServiceAttributeHandler) buildCreate(kafkaMap interface{}, service
FormatVersion: uintOrDefault(vla.formatVersion),
Placement: vla.placement,
ResponseCondition: vla.responseCondition,
ParseLogKeyvals: gofastly.Compatibool(df["parse_log_keyvals"].(bool)),
RequestMaxBytes: uint(df["request_max_bytes"].(int)),
AuthMethod: df["auth_method"].(string),
User: df["user"].(string),
Password: df["password"].(string),
}
}

Expand Down
40 changes: 40 additions & 0 deletions fastly/block_fastly_service_v1_logging_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func TestResourceFastlyFlattenKafka(t *testing.T) {
Format: `%a %l %u %t %m %U%q %H %>s %b %T`,
FormatVersion: 2,
Placement: "none",
ParseLogKeyvals: true,
RequestMaxBytes: 12345,
AuthMethod: "scram-sha-512",
User: "user",
Password: "password",
},
},
local: []map[string]interface{}{
Expand All @@ -53,6 +58,11 @@ func TestResourceFastlyFlattenKafka(t *testing.T) {
"format": `%a %l %u %t %m %U%q %H %>s %b %T`,
"placement": "none",
"format_version": uint(2),
"parse_log_keyvals": true,
"request_max_bytes": uint(12345),
"auth_method": "scram-sha-512",
"user": "user",
"password": "password",
},
},
},
Expand Down Expand Up @@ -87,6 +97,11 @@ func TestAccFastlyServiceV1_kafkalogging_basic(t *testing.T) {
Format: `%a %l %u %t %m %U%q %H %>s %b %T`,
FormatVersion: 2,
Placement: "none",
ParseLogKeyvals: true,
RequestMaxBytes: 12345,
AuthMethod: "plain",
User: "user",
Password: "password",
}

log1_after_update := gofastly.Kafka{
Expand All @@ -105,6 +120,11 @@ func TestAccFastlyServiceV1_kafkalogging_basic(t *testing.T) {
Format: `%a %l %u %t %m %U%q %H %>s %b %T`,
FormatVersion: 2,
Placement: "waf_debug",
ParseLogKeyvals: true,
RequestMaxBytes: 12345,
AuthMethod: "scram-sha-256",
User: "user",
Password: "password",
}

log2 := gofastly.Kafka{
Expand All @@ -123,6 +143,11 @@ func TestAccFastlyServiceV1_kafkalogging_basic(t *testing.T) {
Format: `%a %l %u %t %m %U%q %H %>s %b %T`,
FormatVersion: 2,
Placement: "none",
ParseLogKeyvals: true,
RequestMaxBytes: 12345,
AuthMethod: "scram-sha-256",
User: "user",
Password: "password",
}

resource.Test(t, resource.TestCase{
Expand Down Expand Up @@ -327,6 +352,11 @@ resource "fastly_service_v1" "foo" {
format = "%%a %%l %%u %%t %%m %%U%%q %%H %%>s %%b %%T"
format_version = 2
placement = "none"
parse_log_keyvals = true
request_max_bytes = 12345
auth_method = "plain"
user = "user"
password = "password"
}
force_destroy = true
Expand Down Expand Up @@ -371,6 +401,11 @@ resource "fastly_service_v1" "foo" {
format = "%%a %%l %%u %%t %%m %%U%%q %%H %%>s %%b %%T"
format_version = 2
placement = "waf_debug"
parse_log_keyvals = true
request_max_bytes = 12345
auth_method = "scram-sha-256"
user = "user"
password = "password"
}
logging_kafka {
Expand All @@ -388,6 +423,11 @@ resource "fastly_service_v1" "foo" {
format = "%%a %%l %%u %%t %%m %%U%%q %%H %%>s %%b %%T"
format_version = 2
placement = "none"
parse_log_keyvals = true
request_max_bytes = 12345
auth_method = "scram-sha-256"
user = "user"
password = "password"
}
force_destroy = true
Expand Down

0 comments on commit 63ac643

Please sign in to comment.