Skip to content

Commit

Permalink
check unprocessed segment id is nill (#122)
Browse files Browse the repository at this point in the history
* check unprocessed segment id is nill

* update logging
  • Loading branch information
wangzlei authored Apr 13, 2021
1 parent 863d7b4 commit 814dfd8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
8 changes: 7 additions & 1 deletion pkg/processor/batchprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,19 @@ func (s *segmentsBatch) poll() {
for i := 0; i < len(batch); i++ {
segIdStrs := segIdRegexp.FindStringSubmatch(*batch[i])
if len(segIdStrs) != 2 {
log.Debugf("Failed to match \"id\" in segment: ", *batch[i])
log.Debugf("Failed to match \"id\" in segment: %v", *batch[i])
continue
}
batchesMap[segIdStrs[1]] = *batch[i]
}
for _, unprocessedSegment := range r.UnprocessedTraceSegments {
telemetry.T.SegmentRejected(1)
// Print all segments since don't know which exact one is invalid.
if unprocessedSegment.Id == nil {
log.Debugf("Received nil unprocessed segment id from X-Ray service: %v", unprocessedSegment)
log.Debugf("Content in this batch: %v", params)
break
}
traceIdStrs := traceIdRegexp.FindStringSubmatch(batchesMap[*unprocessedSegment.Id])
if len(traceIdStrs) != 2 {
log.Errorf("Unprocessed segment: %v", unprocessedSegment)
Expand Down
34 changes: 29 additions & 5 deletions pkg/processor/batchprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (c *MockXRayClient) PutTraceSegments(input *xray.PutTraceSegmentsInput) (*x
if errorStr == "Send unprocessed" {
segmentID := "Test-Segment-Id-1242113"
output.UnprocessedTraceSegments = append(output.UnprocessedTraceSegments, &xray.UnprocessedTraceSegment{Id: &segmentID})
} else if errorStr == "Send Invalid" {
output.UnprocessedTraceSegments = append(output.UnprocessedTraceSegments, &xray.UnprocessedTraceSegment{Id: nil})
} else if errorStr != "" {
err = errors.New(errorStr)
}
Expand Down Expand Up @@ -110,7 +112,7 @@ func TestPollSendSuccess(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[1], doneMsg))
}

func TestPoolSendFailedOnceMoreThanMin(t *testing.T) {
func TestPollSendFailedOnceMoreThanMin(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
Expand Down Expand Up @@ -150,7 +152,7 @@ func TestPoolSendFailedOnceMoreThanMin(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPoolSendFailedTwiceMoreThanMin(t *testing.T) {
func TestPollSendFailedTwiceMoreThanMin(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
Expand Down Expand Up @@ -198,7 +200,7 @@ func TestPoolSendFailedTwiceMoreThanMin(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPoolSendFailedTwiceAndSucceedThird(t *testing.T) {
func TestPollSendFailedTwiceAndSucceedThird(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
Expand Down Expand Up @@ -291,7 +293,7 @@ func TestPutTraceSegmentsParameters(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[1], doneMsg))
}

func TestPoolSendReturnUnprocessed(t *testing.T) {
func TestPollSendReturnUnprocessed(t *testing.T) {
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Send unprocessed").Once()
Expand All @@ -309,10 +311,32 @@ func TestPoolSendReturnUnprocessed(t *testing.T) {
<-s.done

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, 1)
assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)) )
assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)))
assert.True(t, strings.Contains(log.Logs[1], "Unprocessed segment"))
}

func TestPollSendReturnUnprocessedInvalid(t *testing.T) {
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Send Invalid").Once()
s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
}
testMessage := "{\"id\":\"9472\""
batch := []*string{&testMessage}
s.send(batch)

go s.poll()
close(s.batches)
<-s.done

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, 1)
assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)))
assert.True(t, strings.Contains(log.Logs[1], "Received nil unprocessed segment id from X-Ray service"))
}

type minTestCase struct {
x int32
y int32
Expand Down
4 changes: 2 additions & 2 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"github.com/aws/aws-xray-daemon/pkg/ringbuffer"
"github.com/aws/aws-xray-daemon/pkg/tracesegment"

"math/rand"
"os"
"github.com/aws/aws-xray-daemon/pkg/cfg"
"github.com/aws/aws-xray-daemon/pkg/conn"
"github.com/aws/aws-xray-daemon/pkg/util/timer"
"math/rand"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down

0 comments on commit 814dfd8

Please sign in to comment.