Skip to content

Commit

Permalink
Adding tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 9, 2024
1 parent fd17710 commit 81922e7
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
5 changes: 2 additions & 3 deletions lib/kafkalib/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func (c Connection) Dialer(ctx context.Context) (*kafka.Dialer, error) {
}

dialer.SASLMechanism = aws_msk_iam_v2.NewMechanism(_awsCfg)
if !c.disableTLS {
dialer.TLS = &tls.Config{}
}
// We don't need to disable TLS for AWS IAM since MSK will always enable TLS.
dialer.TLS = &tls.Config{}
case Plain:
// No mechanism
default:
Expand Down
69 changes: 69 additions & 0 deletions lib/kafkalib/connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kafkalib

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestConnection_Mechanism(t *testing.T) {
{
c := NewConnection(false, false, "", "")
assert.Equal(t, Plain, c.Mechanism())
}
{
c := NewConnection(false, false, "username", "password")
assert.Equal(t, ScramSha512, c.Mechanism())

// AWS MSK IAM is enabled, but SCRAM is preferred
c = NewConnection(true, false, "username", "password")
assert.Equal(t, ScramSha512, c.Mechanism())
}
{
c := NewConnection(true, false, "", "")
assert.Equal(t, AwsMskIam, c.Mechanism())
}
}

func TestConnection_Dialer(t *testing.T) {
ctx := context.Background()
{
// Plain
c := NewConnection(false, false, "", "")
dialer, err := c.Dialer(ctx)
assert.NoError(t, err)
assert.Nil(t, dialer.TLS)
assert.Nil(t, dialer.SASLMechanism)
}
{
// SCRAM enabled with TLS
c := NewConnection(false, false, "username", "password")
dialer, err := c.Dialer(ctx)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.NotNil(t, dialer.SASLMechanism)

// w/o TLS
c = NewConnection(false, true, "username", "password")
dialer, err = c.Dialer(ctx)
assert.NoError(t, err)
assert.Nil(t, dialer.TLS)
assert.NotNil(t, dialer.SASLMechanism)
}
{
// AWS IAM w/ TLS
c := NewConnection(true, false, "", "")
dialer, err := c.Dialer(ctx)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.NotNil(t, dialer.SASLMechanism)

// w/o TLS (still enabled because AWS doesn't support not having TLS)
c = NewConnection(true, true, "", "")
dialer, err = c.Dialer(ctx)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.NotNil(t, dialer.SASLMechanism)
}
}

0 comments on commit 81922e7

Please sign in to comment.