Skip to content

Commit

Permalink
Merge branch 'master' into native-support-timestamp-w-timezone
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Jul 9, 2024
2 parents 8774495 + 677f7fe commit fd0a455
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
31 changes: 28 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,36 @@ import (
)

type Kafka struct {
// Required
BootstrapServers string `yaml:"bootstrapServers"`
TopicPrefix string `yaml:"topicPrefix"`
AwsEnabled bool `yaml:"awsEnabled"`
PublishSize uint `yaml:"publishSize,omitempty"`
MaxRequestSize uint64 `yaml:"maxRequestSize,omitempty"`
// Optional
AwsEnabled bool `yaml:"awsEnabled,omitempty"`
PublishSize uint `yaml:"publishSize,omitempty"`
MaxRequestSize uint64 `yaml:"maxRequestSize,omitempty"`
// If username and password are passed in, we'll use SCRAM w/ SHA512.
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
}

type Mechanism string

const (
None Mechanism = ""
ScramSha512 Mechanism = "SCRAM-SHA-512"
AwsMskIam Mechanism = "AWS-MSK-IAM"
)

func (k *Kafka) Mechanism() Mechanism {
if k.Username != "" && k.Password != "" {
return ScramSha512
}

if k.AwsEnabled {
return AwsMskIam
}

return None
}

func (k *Kafka) BootstrapAddresses() []string {
Expand Down
23 changes: 21 additions & 2 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"time"

awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
"github.com/segmentio/kafka-go/sasl/scram"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/size"
"github.com/segmentio/kafka-go"
)

const (
Expand All @@ -38,7 +39,10 @@ func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) {
writer.BatchBytes = int64(cfg.MaxRequestSize)
}

if cfg.AwsEnabled {
switch cfg.Mechanism() {
case config.AwsMskIam:
// If using AWS MSK IAM, we expect this to be set in the ENV VAR
// (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.)
saslCfg, err := awsCfg.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load AWS configuration: %w", err)
Expand All @@ -49,6 +53,21 @@ func newWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) {
SASL: aws_msk_iam_v2.NewMechanism(saslCfg),
TLS: &tls.Config{},
}
case config.ScramSha512:
mechanism, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
if err != nil {
return nil, fmt.Errorf("failed to create scram mechanism: %w", err)
}

writer.Transport = &kafka.Transport{
DialTimeout: 10 * time.Second,
SASL: mechanism,
TLS: &tls.Config{},
}
case config.None:
// No mechanism
default:
return nil, fmt.Errorf("unsupported kafka mechanism: %s", cfg.Mechanism())
}

return writer, nil
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func buildDestinationWriter(ctx context.Context, cfg *config.Settings, statsD mt
if kafkaCfg == nil {
return nil, fmt.Errorf("kafka configuration is not set")
}

slog.Info("Kafka config",
slog.Bool("aws", kafkaCfg.AwsEnabled),
slog.Any("authMechanism", kafkaCfg.Mechanism()),
slog.String("kafkaBootstrapServer", kafkaCfg.BootstrapServers),
slog.Any("publishSize", kafkaCfg.GetPublishSize()),
slog.Uint64("maxRequestSize", kafkaCfg.MaxRequestSize),
Expand Down

0 comments on commit fd0a455

Please sign in to comment.