-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail Stream Replicator on startup if source or target isn't reachable #205
base: master
Are you sure you want to change the base?
Conversation
Duplicate of #183 |
Responded to comment about a removed unit test here https://github.com/snowplow-devops/stream-replicator/pull/183/files/adb452ba5f917aa0fc01e8f02a2cc25661c33ef5..5e9cf7c2dbc230c4449093f48efb17ab1cf305e4#r944267068 |
@@ -220,6 +227,37 @@ func TestHttpWrite_Concurrent(t *testing.T) { | |||
assert.Equal(int64(10), ackOps) | |||
} | |||
|
|||
func TestHttpWrite_RequestFailure(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test back after discussion here: #183 (comment)
@@ -32,6 +32,13 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se | |||
panic(err) | |||
} | |||
mutex.Lock() | |||
if req.URL.Path == `/error` { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added after discussion here: #183 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is clean and gives us better coverage of the cases we want.
@@ -65,7 +64,9 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { | |||
assert.Nil(err) | |||
|
|||
err = source.Read(nil) | |||
assert.True(strings.HasPrefix(err.Error(), `Failed to get SQS queue URL`)) | |||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added after discussion here: #183 (comment)
Besides the fixed I've done I have responded to some comments on #183 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're missing the following:
- kinesis source
- kafka target
- kinesis target
- sqs target
Some of them have tests already written but commented out, some need tests to be written.
Well that's good- less work involved. I think every source and target needs a test for this - so as well as uncommenting the ones that are there already, if there's any that don't have tests for any, then let's add them. |
Added checks (or unit tests if check exists) for all sources and targets. Also updated PR desc. |
@@ -243,7 +249,7 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { | |||
case <-time.After(10 * time.Second): | |||
// Append errors and crash | |||
multierror.Append(kinesisPullErr, errors.Errorf("wg.Wait() took too long, forcing app close.")) | |||
ks.log.WithFields(log.Fields{"error": err}).Fatal(err) | |||
ks.log.WithFields(log.Fields{"error": kinesisPullErr}).Fatal(kinesisPullErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed randomly that the error we logged here was the wrong one, fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot - Can we pull this into its own issue for auditability - it's a distinct bug which might be important to some debugging investigation in future - so I'd prefer it to have its own audit trail.
(Doesn't need a separate PR though, just its own commit and issue within this PR is fine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created issue here: #209 and a separate commit here for it.
5b99e66
to
b5f0154
Compare
b5f0154
to
bfaa707
Compare
|
||
assert.Nil(source) | ||
assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`) | ||
|
||
if testing.Short() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy paste error here I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, weird how that got there
pkg/source/sqs/sqs_source_test.go
Outdated
@@ -61,10 +74,14 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { | |||
|
|||
source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue") | |||
|
|||
assert.Nil(source) | |||
assert.NotNil(err) | |||
assert.NotNil(source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still the opposite behaviour to what we want: #183 (comment)
If the source can't be reached then newSQSSourceWithInterfaces
should throw an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a call to GetQueueUrl
inside newSQSSourceWithInterfaces
which should fail if there is no connection to SQS.
pkg/source/sqs/sqs_source_test.go
Outdated
@@ -30,6 +30,20 @@ func TestMain(m *testing.M) { | |||
os.Exit(exitVal) | |||
} | |||
|
|||
// TestNewSqsSource_ConnectionCheck tests that the SQS source fails on start-up if the connection to SQS fails | |||
func TestNewSqsSource_ConnectionCheck(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't test a connection check with the SQS queue. It tests if an AWS client session can be created.
We can have a valid AWS session and without having a valid connection to the SQS queue, but not vice-versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, this tests the connection to AWS. I am now testing the case where a connection to SQS cannot be done in TestNewSQSSourceWithInterfaces_Failure
@@ -15,6 +15,19 @@ import ( | |||
"github.com/snowplow-devops/stream-replicator/pkg/testutil" | |||
) | |||
|
|||
// TestNewKinesisTarget_ConnectionCheck tests that the Kinesis target fails on start-up if the connection to Kinesis fails | |||
func TestNewKinesisTarget_ConnectionCheck(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as my coment for the sqs source - there's no check for a connection to a stream, this failure only looks at the AWS session.
I don't see anywhere in the kinesis target codebase where we're doing a connection check at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added a DescribeStream
call to the Kinesis target too. The case where the connection to Kinesis could not be done is now tested in TestKinesisTarget_WriteFailure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure you've pushed all your changes? I can't see any TestKinesisTarget_WriteFailure
test. I can see you've renamed it and replaced it with TestNewKinesisTarget_ConnectionCheck
and TestKinesisTarget_KinesisConnectionFailure
, neither of which appear to test this feature.
pkg/target/sqs_test.go
Outdated
@@ -15,6 +15,15 @@ import ( | |||
"github.com/snowplow-devops/stream-replicator/pkg/testutil" | |||
) | |||
|
|||
// TestNewSqsTarget_ConnectionCheck tests that the SQS target fails on start-up if the connection to SQS fails | |||
func TestNewSqsTarget_ConnectionCheck(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as for the sqs source and the kinesis target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as in the SQS source, I added a quick call to GetQueueUrl
in newSQSTargetWithInterfaces
so we test the connection to SQS instead of AWS.
e12d862
to
a1f30ad
Compare
420d2ca
to
d1aa932
Compare
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID()) | ||
|
||
err = source.Read(nil) | ||
assert.NotNil(err) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test will pass when we don't throw any error. It needs to assert against the error existing first.
if testing.Short() { | ||
t.Skip("skipping integration test") | ||
} | ||
|
||
assert := assert.New(t) | ||
|
||
client := testutil.GetAWSLocalstackKinesisClient() | ||
client := kinesis.New(session.Must(session.NewSession(&aws.Config{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why you're manually creating another new kinesis client here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop force pushing before requested changes have been reviewed. It just creates needless work and makes reviews of your PRs take longer than they need to.
It's easier for everyone to squash once at the end, when the entire review is done.
I'm gonna reopen this and move it to draft - there's been a lot of messing around in between but I still plan on picking this up from where we left off and pulling it into an upcoming release! |
dd7c31c
to
e4c848a
Compare
Added connection checks to: