diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index af10e7c5..7239f189 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -745,6 +745,8 @@ func (b *backend) Heartbeat() error { rc := b.rp.Get() defer rc.Close() + metrics := b.stats.Extract().ToMetrics() + active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1")) if err != nil { return fmt.Errorf("error getting active queues: %w", err) @@ -773,7 +775,7 @@ func (b *backend) Heartbeat() error { bulkSize += count } - // get our DB and redis stats + // calculate DB and redis pool metrics dbStats := b.db.Stats() redisStats := b.rp.Stats() dbWaitDurationInPeriod := dbStats.WaitDuration - b.dbWaitDuration @@ -781,40 +783,16 @@ func (b *backend) Heartbeat() error { b.dbWaitDuration = dbStats.WaitDuration b.redisWaitDuration = redisStats.WaitDuration - stats := b.stats.Extract() - - metrics := make([]cwtypes.MetricDatum, 0, 10) hostDim := cwatch.Dimension("Host", b.config.InstanceID) - metrics = append(metrics, cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim), cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim), cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim), cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim), - cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")), cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")), - cwatch.Datum("ContactsCreated", float64(stats.ContactsCreated), cwtypes.StandardUnitCount), ) - metrics = append(metrics, stats.IncomingRequests.Metrics("IncomingRequests")...) - metrics = append(metrics, stats.IncomingMessages.Metrics("IncomingMessages")...) - metrics = append(metrics, stats.IncomingStatuses.Metrics("IncomingStatuses")...) - metrics = append(metrics, stats.IncomingEvents.Metrics("IncomingEvents")...) - metrics = append(metrics, stats.IncomingIgnored.Metrics("IncomingIgnored")...) - metrics = append(metrics, stats.OutgoingSends.Metrics("OutgoingSends")...) - metrics = append(metrics, stats.OutgoingErrors.Metrics("OutgoingErrors")...) - - // turn our duration stats into averages for metrics - for cType, count := range stats.IncomingDuration { - avgTime := float64(count) / float64(stats.IncomingRequests[cType]) - metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType)))) - } - for cType, duration := range stats.OutgoingDuration { - avgTime := float64(duration) / float64(stats.OutgoingSends[cType]+stats.OutgoingErrors[cType]) - metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, cwtypes.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(cType)))) - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) if err := b.cw.Send(ctx, metrics...); err != nil { slog.Error("error sending metrics", "error", err) diff --git a/backends/rapidpro/stats.go b/backends/rapidpro/stats.go index 734713fb..2b44ba8c 100644 --- a/backends/rapidpro/stats.go +++ b/backends/rapidpro/stats.go @@ -11,8 +11,8 @@ import ( type CountByType map[courier.ChannelType]int -// Metrics converts per channel counts into cloudwatch metrics with type as a dimension -func (c CountByType) Metrics(name string) []types.MetricDatum { +// converts per channel counts into cloudwatch metrics with type as a dimension +func (c CountByType) metrics(name string) []types.MetricDatum { m := make([]types.MetricDatum, 0, len(c)) for typ, count := range c { m = append(m, cwatch.Datum(name, float64(count), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ)))) @@ -54,6 +54,31 @@ func newStats() *Stats { } } +func (s *Stats) ToMetrics() []types.MetricDatum { + metrics := make([]types.MetricDatum, 0, 20) + metrics = append(metrics, s.IncomingRequests.metrics("IncomingRequests")...) + metrics = append(metrics, s.IncomingMessages.metrics("IncomingMessages")...) + metrics = append(metrics, s.IncomingStatuses.metrics("IncomingStatuses")...) + metrics = append(metrics, s.IncomingEvents.metrics("IncomingEvents")...) + metrics = append(metrics, s.IncomingIgnored.metrics("IncomingIgnored")...) + + for typ, d := range s.IncomingDuration { // convert to averages + avgTime := float64(d) / float64(s.IncomingRequests[typ]) + metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ)))) + } + + metrics = append(metrics, s.OutgoingSends.metrics("OutgoingSends")...) + metrics = append(metrics, s.OutgoingErrors.metrics("OutgoingErrors")...) + + for typ, d := range s.OutgoingDuration { // convert to averages + avgTime := float64(d) / float64(s.OutgoingSends[typ]+s.OutgoingErrors[typ]) + metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ)))) + } + + metrics = append(metrics, cwatch.Datum("ContactsCreated", float64(s.ContactsCreated), types.StandardUnitCount)) + return metrics +} + // StatsCollector provides threadsafe stats collection type StatsCollector struct { mutex sync.Mutex diff --git a/backends/rapidpro/stats_test.go b/backends/rapidpro/stats_test.go index 1badabc8..caf92fd0 100644 --- a/backends/rapidpro/stats_test.go +++ b/backends/rapidpro/stats_test.go @@ -32,6 +32,9 @@ func TestStats(t *testing.T) { assert.Equal(t, rapidpro.CountByType{}, stats.OutgoingErrors) assert.Equal(t, rapidpro.DurationByType{"T": time.Second * 2, "FBA": time.Second * 3}, stats.OutgoingDuration) + metrics := stats.ToMetrics() + assert.Len(t, metrics, 8) + sc.RecordOutgoing("FBA", true, time.Second) sc.RecordOutgoing("FBA", true, time.Second) @@ -46,4 +49,7 @@ func TestStats(t *testing.T) { assert.Equal(t, rapidpro.CountByType{"FBA": 2}, stats.OutgoingSends) assert.Equal(t, rapidpro.CountByType{}, stats.OutgoingErrors) assert.Equal(t, rapidpro.DurationByType{"FBA": time.Second * 2}, stats.OutgoingDuration) + + metrics = stats.ToMetrics() + assert.Len(t, metrics, 3) }