From 814dfd80a69ccb1fd67833652f7372f21f2d3b02 Mon Sep 17 00:00:00 2001 From: Lei Wang <66336933+wangzlei@users.noreply.github.com> Date: Mon, 12 Apr 2021 21:36:37 -0700 Subject: [PATCH] check unprocessed segment id is nill (#122) * check unprocessed segment id is nill * update logging --- pkg/processor/batchprocessor.go | 8 ++++++- pkg/processor/batchprocessor_test.go | 34 ++++++++++++++++++++++++---- pkg/processor/processor.go | 4 ++-- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/pkg/processor/batchprocessor.go b/pkg/processor/batchprocessor.go index 2c91da3..bb54fe1 100644 --- a/pkg/processor/batchprocessor.go +++ b/pkg/processor/batchprocessor.go @@ -101,13 +101,19 @@ func (s *segmentsBatch) poll() { for i := 0; i < len(batch); i++ { segIdStrs := segIdRegexp.FindStringSubmatch(*batch[i]) if len(segIdStrs) != 2 { - log.Debugf("Failed to match \"id\" in segment: ", *batch[i]) + log.Debugf("Failed to match \"id\" in segment: %v", *batch[i]) continue } batchesMap[segIdStrs[1]] = *batch[i] } for _, unprocessedSegment := range r.UnprocessedTraceSegments { telemetry.T.SegmentRejected(1) + // Print all segments since don't know which exact one is invalid. + if unprocessedSegment.Id == nil { + log.Debugf("Received nil unprocessed segment id from X-Ray service: %v", unprocessedSegment) + log.Debugf("Content in this batch: %v", params) + break + } traceIdStrs := traceIdRegexp.FindStringSubmatch(batchesMap[*unprocessedSegment.Id]) if len(traceIdStrs) != 2 { log.Errorf("Unprocessed segment: %v", unprocessedSegment) diff --git a/pkg/processor/batchprocessor_test.go b/pkg/processor/batchprocessor_test.go index 2de1a93..8c6fcf5 100644 --- a/pkg/processor/batchprocessor_test.go +++ b/pkg/processor/batchprocessor_test.go @@ -41,6 +41,8 @@ func (c *MockXRayClient) PutTraceSegments(input *xray.PutTraceSegmentsInput) (*x if errorStr == "Send unprocessed" { segmentID := "Test-Segment-Id-1242113" output.UnprocessedTraceSegments = append(output.UnprocessedTraceSegments, &xray.UnprocessedTraceSegment{Id: &segmentID}) + } else if errorStr == "Send Invalid" { + output.UnprocessedTraceSegments = append(output.UnprocessedTraceSegments, &xray.UnprocessedTraceSegment{Id: nil}) } else if errorStr != "" { err = errors.New(errorStr) } @@ -110,7 +112,7 @@ func TestPollSendSuccess(t *testing.T) { assert.True(t, strings.Contains(log.Logs[1], doneMsg)) } -func TestPoolSendFailedOnceMoreThanMin(t *testing.T) { +func TestPollSendFailedOnceMoreThanMin(t *testing.T) { seed := int64(122321) randGen := rand.New(rand.NewSource(seed)) timer := test.MockTimerClient{} @@ -150,7 +152,7 @@ func TestPoolSendFailedOnceMoreThanMin(t *testing.T) { assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg)) } -func TestPoolSendFailedTwiceMoreThanMin(t *testing.T) { +func TestPollSendFailedTwiceMoreThanMin(t *testing.T) { seed := int64(122321) randGen := rand.New(rand.NewSource(seed)) timer := test.MockTimerClient{} @@ -198,7 +200,7 @@ func TestPoolSendFailedTwiceMoreThanMin(t *testing.T) { assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg)) } -func TestPoolSendFailedTwiceAndSucceedThird(t *testing.T) { +func TestPollSendFailedTwiceAndSucceedThird(t *testing.T) { seed := int64(122321) randGen := rand.New(rand.NewSource(seed)) timer := test.MockTimerClient{} @@ -291,7 +293,7 @@ func TestPutTraceSegmentsParameters(t *testing.T) { assert.True(t, strings.Contains(log.Logs[1], doneMsg)) } -func TestPoolSendReturnUnprocessed(t *testing.T) { +func TestPollSendReturnUnprocessed(t *testing.T) { log := test.LogSetup() xRay := new(MockXRayClient) xRay.On("PutTraceSegments", nil).Return("Send unprocessed").Once() @@ -309,10 +311,32 @@ func TestPoolSendReturnUnprocessed(t *testing.T) { <-s.done assert.EqualValues(t, xRay.CallNoToPutTraceSegments, 1) - assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)) ) + assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1))) assert.True(t, strings.Contains(log.Logs[1], "Unprocessed segment")) } +func TestPollSendReturnUnprocessedInvalid(t *testing.T) { + log := test.LogSetup() + xRay := new(MockXRayClient) + xRay.On("PutTraceSegments", nil).Return("Send Invalid").Once() + s := segmentsBatch{ + batches: make(chan []*string, 1), + xRay: xRay, + done: make(chan bool), + } + testMessage := "{\"id\":\"9472\"" + batch := []*string{&testMessage} + s.send(batch) + + go s.poll() + close(s.batches) + <-s.done + + assert.EqualValues(t, xRay.CallNoToPutTraceSegments, 1) + assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1))) + assert.True(t, strings.Contains(log.Logs[1], "Received nil unprocessed segment id from X-Ray service")) +} + type minTestCase struct { x int32 y int32 diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index 950af2f..be5ee19 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -19,11 +19,11 @@ import ( "github.com/aws/aws-xray-daemon/pkg/ringbuffer" "github.com/aws/aws-xray-daemon/pkg/tracesegment" - "math/rand" - "os" "github.com/aws/aws-xray-daemon/pkg/cfg" "github.com/aws/aws-xray-daemon/pkg/conn" "github.com/aws/aws-xray-daemon/pkg/util/timer" + "math/rand" + "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session"