diff --git a/config/config.go b/config/config.go index 54143f07..c9117621 100644 --- a/config/config.go +++ b/config/config.go @@ -34,7 +34,7 @@ const ( ) func (k *Kafka) Mechanism() Mechanism { - if k.Username != "" { + if k.Username != "" && k.Password != "" { return ScramSha512 } diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index db45b326..9b57e29c 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -9,6 +9,7 @@ import ( "time" awsCfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" "github.com/segmentio/kafka-go/sasl/scram" @@ -18,7 +19,6 @@ import ( "github.com/artie-labs/reader/lib/mtr" "github.com/artie-labs/transfer/lib/jitter" "github.com/artie-labs/transfer/lib/size" - "github.com/segmentio/kafka-go" ) const ( @@ -41,29 +41,29 @@ func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) { } switch cfg.Mechanism() { - case config.ScramSha512: - // If username and password are provided, we'll use SCRAM w/ SHA512. - mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password) + case config.AwsMskIam: + // If using AWS MSK IAM, we expect this to be set in the ENV VAR + // (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.) + saslCfg, err := awsCfg.LoadDefaultConfig(ctx) if err != nil { - logger.Panic("Failed to create SCRAM mechanism", slog.Any("err", err)) + return nil, fmt.Errorf("failed to load AWS configuration: %w", err) } writer.Transport = &kafka.Transport{ DialTimeout: 10 * time.Second, - SASL: mechanism, + SASL: aws_msk_iam_v2.NewMechanism(saslCfg), TLS: &tls.Config{}, } - case config.AwsMskIam: - // If using AWS MSK IAM, we expect this to be set in the ENV VAR - // (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.) - saslCfg, err := awsCfg.LoadDefaultConfig(ctx) + case config.ScramSha512: + // If username and password are provided, we'll use SCRAM w/ SHA512. + mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password) if err != nil { - return nil, fmt.Errorf("failed to load AWS configuration: %w", err) + logger.Panic("Failed to create SCRAM mechanism", slog.Any("err", err)) } writer.Transport = &kafka.Transport{ DialTimeout: 10 * time.Second, - SASL: aws_msk_iam_v2.NewMechanism(saslCfg), + SASL: mechanism, TLS: &tls.Config{}, } }