Skip to content

Commit

Permalink
Clean up imports
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 8, 2024
1 parent 993b087 commit c0645cd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

func (k *Kafka) Mechanism() Mechanism {
if k.Username != "" {
if k.Username != "" && k.Password != "" {
return ScramSha512
}

Expand Down
24 changes: 12 additions & 12 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
"github.com/segmentio/kafka-go/sasl/scram"

Expand All @@ -18,7 +19,6 @@ import (
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/size"
"github.com/segmentio/kafka-go"
)

const (
Expand All @@ -41,29 +41,29 @@ func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) {
}

switch cfg.Mechanism() {
case config.ScramSha512:
// If username and password are provided, we'll use SCRAM w/ SHA512.
mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
case config.AwsMskIam:
// If using AWS MSK IAM, we expect this to be set in the ENV VAR
// (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.)
saslCfg, err := awsCfg.LoadDefaultConfig(ctx)
if err != nil {
logger.Panic("Failed to create SCRAM mechanism", slog.Any("err", err))
return nil, fmt.Errorf("failed to load AWS configuration: %w", err)
}

writer.Transport = &kafka.Transport{
DialTimeout: 10 * time.Second,
SASL: mechanism,
SASL: aws_msk_iam_v2.NewMechanism(saslCfg),
TLS: &tls.Config{},
}
case config.AwsMskIam:
// If using AWS MSK IAM, we expect this to be set in the ENV VAR
// (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.)
saslCfg, err := awsCfg.LoadDefaultConfig(ctx)
case config.ScramSha512:
// If username and password are provided, we'll use SCRAM w/ SHA512.
mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
if err != nil {
return nil, fmt.Errorf("failed to load AWS configuration: %w", err)
logger.Panic("Failed to create SCRAM mechanism", slog.Any("err", err))
}

writer.Transport = &kafka.Transport{
DialTimeout: 10 * time.Second,
SASL: aws_msk_iam_v2.NewMechanism(saslCfg),
SASL: mechanism,
TLS: &tls.Config{},
}
}
Expand Down

0 comments on commit c0645cd

Please sign in to comment.