Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Stream Replicator on startup if source or target isn't reachable #205

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

TiganeteaRobert
Copy link
Contributor

@TiganeteaRobert TiganeteaRobert commented Aug 22, 2022

Added connection checks to:

  • PubSub source (also added check for existing subscription in project)
  • Kinesis source (added DescribeStream to check for connection and test)
  • SQS source (check existing, added unit test)

  • PubSub target (also added check for existing topic in project)
  • Eventhub target (added check for runtime info)
  • HTTP target (added HEAD request on start-up to check for a connection to the host)
  • Kinesis target (check existing, added unit test)
  • Kafka target (check existing, added unit test)
  • SQS target (check existing, added unit test)

@TiganeteaRobert
Copy link
Contributor Author

Duplicate of #183

@TiganeteaRobert TiganeteaRobert marked this as a duplicate of #183 Aug 22, 2022
@TiganeteaRobert
Copy link
Contributor Author

@@ -220,6 +227,37 @@ func TestHttpWrite_Concurrent(t *testing.T) {
assert.Equal(int64(10), ackOps)
}

func TestHttpWrite_RequestFailure(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test back after discussion here: #183 (comment)

@@ -32,6 +32,13 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se
panic(err)
}
mutex.Lock()
if req.URL.Path == `/error` {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added after discussion here: #183 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is clean and gives us better coverage of the cases we want.

@@ -65,7 +64,9 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {
assert.Nil(err)

err = source.Read(nil)
assert.True(strings.HasPrefix(err.Error(), `Failed to get SQS queue URL`))
if err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added after discussion here: #183 (comment)

@TiganeteaRobert
Copy link
Contributor Author

Besides the fixed I've done I have responded to some comments on #183

Copy link
Collaborator

@colmsnowplow colmsnowplow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're missing the following:

  • kinesis source
  • kafka target
  • kinesis target
  • sqs target

Some of them have tests already written but commented out, some need tests to be written.

@colmsnowplow
Copy link
Collaborator

colmsnowplow commented Aug 23, 2022

Well that's good- less work involved.

I think every source and target needs a test for this - so as well as uncommenting the ones that are there already, if there's any that don't have tests for any, then let's add them.

@TiganeteaRobert
Copy link
Contributor Author

Added checks (or unit tests if check exists) for all sources and targets. Also updated PR desc.

@@ -243,7 +249,7 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
case <-time.After(10 * time.Second):
// Append errors and crash
multierror.Append(kinesisPullErr, errors.Errorf("wg.Wait() took too long, forcing app close."))
ks.log.WithFields(log.Fields{"error": err}).Fatal(err)
ks.log.WithFields(log.Fields{"error": kinesisPullErr}).Fatal(kinesisPullErr)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed randomly that the error we logged here was the wrong one, fixed it.

Copy link
Collaborator

@colmsnowplow colmsnowplow Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot - Can we pull this into its own issue for auditability - it's a distinct bug which might be important to some debugging investigation in future - so I'd prefer it to have its own audit trail.

(Doesn't need a separate PR though, just its own commit and issue within this PR is fine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue here: #209 and a separate commit here for it.


assert.Nil(source)
assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`)

if testing.Short() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy paste error here I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, weird how that got there

@@ -61,10 +74,14 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {

source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue")

assert.Nil(source)
assert.NotNil(err)
assert.NotNil(source)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still the opposite behaviour to what we want: #183 (comment)

If the source can't be reached then newSQSSourceWithInterfaces should throw an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a call to GetQueueUrl inside newSQSSourceWithInterfaces which should fail if there is no connection to SQS.

@@ -30,6 +30,20 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

// TestNewSqsSource_ConnectionCheck tests that the SQS source fails on start-up if the connection to SQS fails
func TestNewSqsSource_ConnectionCheck(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test a connection check with the SQS queue. It tests if an AWS client session can be created.

We can have a valid AWS session and without having a valid connection to the SQS queue, but not vice-versa.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this tests the connection to AWS. I am now testing the case where a connection to SQS cannot be done in TestNewSQSSourceWithInterfaces_Failure

@@ -15,6 +15,19 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/testutil"
)

// TestNewKinesisTarget_ConnectionCheck tests that the Kinesis target fails on start-up if the connection to Kinesis fails
func TestNewKinesisTarget_ConnectionCheck(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as my coment for the sqs source - there's no check for a connection to a stream, this failure only looks at the AWS session.

I don't see anywhere in the kinesis target codebase where we're doing a connection check at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a DescribeStream call to the Kinesis target too. The case where the connection to Kinesis could not be done is now tested in TestKinesisTarget_WriteFailure

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you've pushed all your changes? I can't see any TestKinesisTarget_WriteFailure test. I can see you've renamed it and replaced it with TestNewKinesisTarget_ConnectionCheck and TestKinesisTarget_KinesisConnectionFailure, neither of which appear to test this feature.

@@ -15,6 +15,15 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/testutil"
)

// TestNewSqsTarget_ConnectionCheck tests that the SQS target fails on start-up if the connection to SQS fails
func TestNewSqsTarget_ConnectionCheck(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for the sqs source and the kinesis target.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the SQS source, I added a quick call to GetQueueUrl in newSQSTargetWithInterfaces so we test the connection to SQS instead of AWS.

assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID())

err = source.Read(nil)
assert.NotNil(err)
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will pass when we don't throw any error. It needs to assert against the error existing first.

if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

client := testutil.GetAWSLocalstackKinesisClient()
client := kinesis.New(session.Must(session.NewSession(&aws.Config{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you're manually creating another new kinesis client here?

Copy link
Collaborator

@colmsnowplow colmsnowplow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stop force pushing before requested changes have been reviewed. It just creates needless work and makes reviews of your PRs take longer than they need to.

It's easier for everyone to squash once at the end, when the entire review is done.

@colmsnowplow
Copy link
Collaborator

I'm gonna reopen this and move it to draft - there's been a lot of messing around in between but I still plan on picking this up from where we left off and pulling it into an upcoming release!

@colmsnowplow colmsnowplow reopened this Nov 11, 2022
@colmsnowplow colmsnowplow marked this pull request as draft November 11, 2022 10:39
@colmsnowplow colmsnowplow force-pushed the develop branch 3 times, most recently from dd7c31c to e4c848a Compare April 15, 2024 10:49
Base automatically changed from develop to master April 15, 2024 11:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants