Skip to content

Commit

Permalink
Add E2E latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 13, 2024
1 parent 9302e89 commit 94fff43
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 34 deletions.
4 changes: 4 additions & 0 deletions assets/docs/configuration/metrics/e2e-latency-example.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
metrics {
# Optional toggle for E2E latency (difference between Snowplow collector timestamp and target write timestamp)
enable_e2e_latency = true
}
5 changes: 5 additions & 0 deletions assets/docs/configuration/overview-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ retry {
}
}

metrics {
# Optional toggle for E2E latency (difference between Snowplow collector timestamp and target write timestamp)
enable_e2e_latency = true
}

license {
accept = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
metrics {
enable_e2e_latency = true
}
10 changes: 9 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type configurationData struct {
DisableTelemetry bool `hcl:"disable_telemetry,optional"`
License *licenseConfig `hcl:"license,block"`
Retry *retryConfig `hcl:"retry,block"`
Metrics *metricsConfig `hcl:"metrics,block"`
}

// component is a type to abstract over configuration blocks.
Expand Down Expand Up @@ -100,6 +101,10 @@ type retryConfig struct {
Setup *setupRetryConfig `hcl:"setup,block"`
}

type metricsConfig struct {
E2ELatencyEnabled bool `hcl:"enable_e2e_latency,optional"`
}

type transientRetryConfig struct {
Delay int `hcl:"delay_ms,optional"`
MaxAttempts int `hcl:"max_attempts,optional"`
Expand Down Expand Up @@ -142,6 +147,9 @@ func defaultConfigData() *configurationData {
Delay: 20000,
},
},
Metrics: &metricsConfig{
E2ELatencyEnabled: false,
},
}
}

Expand Down Expand Up @@ -359,7 +367,7 @@ func (c *Config) getStatsReceiver(tags map[string]string) (statsreceiveriface.St
switch useReceiver.Name {
case "statsd":
plug := statsreceiver.AdaptStatsDStatsReceiverFunc(
statsreceiver.NewStatsDReceiverWithTags(tags),
statsreceiver.NewStatsDReceiverWithTags(tags, c.Data.Metrics.E2ELatencyEnabled),
)
component, err := c.CreateComponent(plug, decoderOpts)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions docs/configuration_metrics_docs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package docs

import (
"path/filepath"
"testing"

"github.com/snowplow/snowbridge/assets"
"github.com/stretchr/testify/assert"
)

func TestMetricsConfigDocumentation(t *testing.T) {
assert := assert.New(t)
configPath := filepath.Join(assets.AssetsRootDir, "docs", "configuration", "metrics", "e2e-latency-example.hcl")
c := getConfigFromFilepath(t, configPath)

metricsConfig := c.Data.Metrics
assert.NotNil(metricsConfig)
assert.Equal(true, metricsConfig.E2ELatencyEnabled)
}
3 changes: 3 additions & 0 deletions pkg/models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Message struct {
Data []byte
HTTPHeaders map[string]string

// CollectorTstamp is the timestamp created by the Snowplow collector, extracted from the `collector_tstamp` atomic field. Used to measure E2E latency
CollectorTstamp time.Time

// TimeCreated is when the message was created originally
TimeCreated time.Time

Expand Down
21 changes: 20 additions & 1 deletion pkg/models/observer_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type ObserverBuffer struct {
MaxRequestLatency time.Duration
MinRequestLatency time.Duration
SumRequestLatency time.Duration
MaxE2ELatency time.Duration
MinE2ELatency time.Duration
SumE2ELatency time.Duration
}

// AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result
Expand Down Expand Up @@ -128,6 +131,14 @@ func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) {
b.MinRequestLatency = res.MinRequestLatency
}
b.SumRequestLatency += res.AvgRequestLatency

if b.MaxE2ELatency < res.MaxE2ELatency {
b.MaxE2ELatency = res.MaxE2ELatency
}
if b.MinE2ELatency > res.MinE2ELatency || b.MinE2ELatency == time.Duration(0) {
b.MinE2ELatency = res.MinE2ELatency
}
b.SumE2ELatency += res.AvgE2ELatency
}

// AppendFiltered adds a FilterResult onto the buffer and stores the result
Expand Down Expand Up @@ -180,9 +191,14 @@ func (b *ObserverBuffer) GetAvgRequestLatency() time.Duration {
return common.GetAverageFromDuration(b.SumRequestLatency, b.MsgTotal)
}

// GetAvgE2ELatency calculates average E2E latency
func (b *ObserverBuffer) GetAvgE2ELatency() time.Duration {
return common.GetAverageFromDuration(b.SumE2ELatency, b.MsgTotal)
}

func (b *ObserverBuffer) String() string {
return fmt.Sprintf(
"TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d",
"TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d,MinE2ELatency:%d,MaxE2ELatency:%d,SumE2ELatency:%d",
b.TargetResults,
b.MsgFiltered,
b.MsgSent,
Expand All @@ -203,5 +219,8 @@ func (b *ObserverBuffer) String() string {
b.MinRequestLatency.Milliseconds(),
b.MaxRequestLatency.Milliseconds(),
b.SumRequestLatency.Milliseconds(),
b.MinE2ELatency.Milliseconds(),
b.MaxE2ELatency.Milliseconds(),
b.SumE2ELatency.Milliseconds(),
)
}
13 changes: 10 additions & 3 deletions pkg/models/observer_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestObserverBuffer(t *testing.T) {
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
Expand All @@ -39,6 +40,7 @@ func TestObserverBuffer(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -50,6 +52,7 @@ func TestObserverBuffer(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
Expand Down Expand Up @@ -119,7 +122,11 @@ func TestObserverBuffer(t *testing.T) {
assert.Equal(time.Duration(8)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000", b.String())
assert.Equal(time.Duration(80)*time.Minute, b.MaxE2ELatency)
assert.Equal(time.Duration(40)*time.Minute, b.MinE2ELatency)
assert.Equal(time.Duration(60)*time.Minute, b.GetAvgE2ELatency())

assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000,MinE2ELatency:2400000,MaxE2ELatency:4800000,SumE2ELatency:21600000", b.String())
}

// TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event
Expand Down Expand Up @@ -191,7 +198,7 @@ func TestObserverBuffer_Basic(t *testing.T) {
assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String())
}

// TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event.
Expand Down Expand Up @@ -260,5 +267,5 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) {
assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String())
}
26 changes: 26 additions & 0 deletions pkg/models/target_write_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type TargetWriteResult struct {
MaxRequestLatency time.Duration
MinRequestLatency time.Duration
AvgRequestLatency time.Duration

MaxE2ELatency time.Duration
MinE2ELatency time.Duration
AvgE2ELatency time.Duration
}

// NewTargetWriteResult builds a result structure to return from a target write
Expand All @@ -84,6 +88,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa
var sumMessageLatency time.Duration
var sumTransformLatency time.Duration
var sumRequestLatency time.Duration
var sumE2ELatency time.Duration

for _, msg := range processed {
procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled)
Expand Down Expand Up @@ -124,13 +129,26 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa
r.MinRequestLatency = requestLatency
}
sumRequestLatency += requestLatency

var e2eLatency time.Duration
if !msg.CollectorTstamp.IsZero() {
e2eLatency = msg.TimeRequestFinished.Sub(msg.CollectorTstamp)
}
if r.MaxE2ELatency < e2eLatency {
r.MaxE2ELatency = e2eLatency
}
if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) {
r.MinE2ELatency = e2eLatency
}
sumE2ELatency += e2eLatency
}

if processedLen > 0 {
r.AvgProcLatency = common.GetAverageFromDuration(sumProcLatency, processedLen)
r.AvgMsgLatency = common.GetAverageFromDuration(sumMessageLatency, processedLen)
r.AvgTransformLatency = common.GetAverageFromDuration(sumTransformLatency, processedLen)
r.AvgRequestLatency = common.GetAverageFromDuration(sumRequestLatency, processedLen)
r.AvgE2ELatency = common.GetAverageFromDuration(sumE2ELatency, processedLen)
}

return &r
Expand Down Expand Up @@ -186,6 +204,14 @@ func (wr *TargetWriteResult) Append(nwr *TargetWriteResult) *TargetWriteResult {
wrC.MinRequestLatency = nwr.MinRequestLatency
}
wrC.AvgRequestLatency = common.GetAverageFromDuration(wrC.AvgRequestLatency+nwr.AvgRequestLatency, 2)

if wrC.MaxE2ELatency < nwr.MaxE2ELatency {
wrC.MaxE2ELatency = nwr.MaxE2ELatency
}
if wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0) {
wrC.MinE2ELatency = nwr.MinE2ELatency
}
wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2)
}

return &wrC
Expand Down
45 changes: 20 additions & 25 deletions pkg/models/target_write_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,10 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) {
assert.Equal(time.Duration(0), r.MaxTransformLatency)
assert.Equal(time.Duration(0), r.MinTransformLatency)
assert.Equal(time.Duration(0), r.AvgTransformLatency)
}

// TestNewTargetWriteResult_EmptyWithTime tests that an empty targetWriteResult with no a provided timestamp will report 0s across the board
func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) {
assert := assert.New(t)

r := NewTargetWriteResult(nil, nil, nil, nil)
assert.NotNil(r)

assert.Equal(int64(0), r.SentCount)
assert.Equal(int64(0), r.FailedCount)
assert.Equal(int64(0), r.Total())

assert.Equal(time.Duration(0), r.MaxProcLatency)
assert.Equal(time.Duration(0), r.MinProcLatency)
assert.Equal(time.Duration(0), r.AvgProcLatency)

assert.Equal(time.Duration(0), r.MaxMsgLatency)
assert.Equal(time.Duration(0), r.MinMsgLatency)
assert.Equal(time.Duration(0), r.AvgMsgLatency)

assert.Equal(time.Duration(0), r.MaxTransformLatency)
assert.Equal(time.Duration(0), r.MinTransformLatency)
assert.Equal(time.Duration(0), r.AvgTransformLatency)
assert.Equal(time.Duration(0), r.MaxE2ELatency)
assert.Equal(time.Duration(0), r.MinE2ELatency)
assert.Equal(time.Duration(0), r.AvgE2ELatency)
}

// TestNewTargetWriteResult_WithMessages tests that reporting of statistics is as it should be when we have all data
Expand All @@ -76,6 +56,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
Expand All @@ -84,6 +65,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -94,6 +76,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
Expand All @@ -116,11 +99,15 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
assert.Equal(time.Duration(3)*time.Minute, r.MaxTransformLatency)
assert.Equal(time.Duration(1)*time.Minute, r.MinTransformLatency)
assert.Equal(time.Duration(2)*time.Minute, r.AvgTransformLatency)
assert.Equal(time.Duration(80)*time.Minute, r.MaxE2ELatency)
assert.Equal(time.Duration(40)*time.Minute, r.MinE2ELatency)
assert.Equal(time.Duration(60)*time.Minute, r.AvgE2ELatency)

sent1 := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
Expand All @@ -131,6 +118,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-120) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -139,6 +127,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-30) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
Expand Down Expand Up @@ -172,10 +161,13 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
assert.Equal(time.Duration(8)*time.Minute, r3.MaxTransformLatency)
assert.Equal(time.Duration(1)*time.Minute, r3.MinTransformLatency)
assert.Equal(time.Duration(3)*time.Minute, r3.AvgTransformLatency)
assert.Equal(time.Duration(120)*time.Minute, r3.MaxE2ELatency)
assert.Equal(time.Duration(30)*time.Minute, r3.MinE2ELatency)
assert.Equal(time.Duration(65)*time.Minute, r3.AvgE2ELatency)
}

// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed
func TestNewTargetWriteResult_NoTransformation(t *testing.T) {
// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed and no collector timestamp
func TestNewTargetWriteResult_NoTransformation_NoE2E(t *testing.T) {
assert := assert.New(t)

timeNow := time.Now().UTC()
Expand Down Expand Up @@ -221,4 +213,7 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) {
assert.Equal(time.Duration(0), r.MaxTransformLatency)
assert.Equal(time.Duration(0), r.MinTransformLatency)
assert.Equal(time.Duration(0), r.AvgTransformLatency)
assert.Equal(time.Duration(0), r.MaxE2ELatency)
assert.Equal(time.Duration(0), r.MinE2ELatency)
assert.Equal(time.Duration(0), r.AvgE2ELatency)
}
Loading

0 comments on commit 94fff43

Please sign in to comment.