From 677f7fe9b371eee827ac41c4541fd6bf77e1caa4 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Mon, 8 Jul 2024 15:53:56 -0700 Subject: [PATCH] [Kafka] Supporting SCRAM-SHA-512 (#432) --- config/config.go | 31 ++++++++++++++++++++++++++++--- lib/kafkalib/writer.go | 23 +++++++++++++++++++++-- main.go | 3 ++- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/config/config.go b/config/config.go index 2d82964a..c9117621 100644 --- a/config/config.go +++ b/config/config.go @@ -13,11 +13,36 @@ import ( ) type Kafka struct { + // Required BootstrapServers string `yaml:"bootstrapServers"` TopicPrefix string `yaml:"topicPrefix"` - AwsEnabled bool `yaml:"awsEnabled"` - PublishSize uint `yaml:"publishSize,omitempty"` - MaxRequestSize uint64 `yaml:"maxRequestSize,omitempty"` + // Optional + AwsEnabled bool `yaml:"awsEnabled,omitempty"` + PublishSize uint `yaml:"publishSize,omitempty"` + MaxRequestSize uint64 `yaml:"maxRequestSize,omitempty"` + // If username and password are passed in, we'll use SCRAM w/ SHA512. + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` +} + +type Mechanism string + +const ( + None Mechanism = "" + ScramSha512 Mechanism = "SCRAM-SHA-512" + AwsMskIam Mechanism = "AWS-MSK-IAM" +) + +func (k *Kafka) Mechanism() Mechanism { + if k.Username != "" && k.Password != "" { + return ScramSha512 + } + + if k.AwsEnabled { + return AwsMskIam + } + + return None } func (k *Kafka) BootstrapAddresses() []string { diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 68d03c9c..7efafad1 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -9,14 +9,15 @@ import ( "time" awsCfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" + "github.com/segmentio/kafka-go/sasl/scram" "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" "github.com/artie-labs/reader/lib/mtr" "github.com/artie-labs/transfer/lib/jitter" "github.com/artie-labs/transfer/lib/size" - "github.com/segmentio/kafka-go" ) const ( @@ -38,7 +39,10 @@ func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) { writer.BatchBytes = int64(cfg.MaxRequestSize) } - if cfg.AwsEnabled { + switch cfg.Mechanism() { + case config.AwsMskIam: + // If using AWS MSK IAM, we expect this to be set in the ENV VAR + // (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.) saslCfg, err := awsCfg.LoadDefaultConfig(ctx) if err != nil { return nil, fmt.Errorf("failed to load AWS configuration: %w", err) @@ -49,6 +53,21 @@ func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) { SASL: aws_msk_iam_v2.NewMechanism(saslCfg), TLS: &tls.Config{}, } + case config.ScramSha512: + mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password) + if err != nil { + return nil, fmt.Errorf("failed to create scram mechanism: %w", err) + } + + writer.Transport = &kafka.Transport{ + DialTimeout: 10 * time.Second, + SASL: mechanism, + TLS: &tls.Config{}, + } + case config.None: + // No mechanism + default: + return nil, fmt.Errorf("unsupported kafka mechanism: %s", cfg.Mechanism()) } return writer, nil diff --git a/main.go b/main.go index 2beea45f..d640f0c9 100644 --- a/main.go +++ b/main.go @@ -62,8 +62,9 @@ func buildDestinationWriter(ctx context.Context, cfg *config.Settings, statsD mt if kafkaCfg == nil { return nil, fmt.Errorf("kafka configuration is not set") } + slog.Info("Kafka config", - slog.Bool("aws", kafkaCfg.AwsEnabled), + slog.Any("authMechanism", kafkaCfg.Mechanism()), slog.String("kafkaBootstrapServer", kafkaCfg.BootstrapServers), slog.Any("publishSize", kafkaCfg.GetPublishSize()), slog.Uint64("maxRequestSize", kafkaCfg.MaxRequestSize),