Skip to content

Commit

Permalink
move duplicated fields from CloudwatchData to a new JobContext (#1106)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeckhart authored Sep 11, 2023
1 parent 039ba23 commit 36e4d8d
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 171 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/storagegateway v1.19.6
github.com/aws/aws-sdk-go-v2/service/sts v1.21.5
github.com/aws/smithy-go v1.14.2
github.com/brianvoe/gofakeit/v6 v6.23.1
github.com/go-kit/log v0.2.1
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db
github.com/prometheus/client_golang v1.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ=
github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/brianvoe/gofakeit/v6 v6.23.1 h1:k2gX0hQpJStvixDbbw8oJOvPBg0XmHJWbSOF5JkiUHw=
github.com/brianvoe/gofakeit/v6 v6.23.1/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
Expand Down
9 changes: 1 addition & 8 deletions pkg/job/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ func runCustomNamespaceJob(
ctx context.Context,
logger logging.Logger,
job *config.CustomNamespace,
region string,
accountID string,
clientCloudwatch cloudwatch.Client,
metricsPerQuery int,
) []*model.CloudwatchData {
Expand All @@ -27,7 +25,7 @@ func runCustomNamespaceJob(
mux := &sync.Mutex{}
var wg sync.WaitGroup

getMetricDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, region, accountID, clientCloudwatch, logger)
getMetricDatas := getMetricDataForQueriesForCustomNamespace(ctx, job, clientCloudwatch, logger)
metricDataLength := len(getMetricDatas)
if metricDataLength == 0 {
logger.Debug("No metrics data found")
Expand Down Expand Up @@ -87,8 +85,6 @@ func findGetMetricDataByIDForCustomNamespace(getMetricDatas []*model.CloudwatchD
func getMetricDataForQueriesForCustomNamespace(
ctx context.Context,
customNamespaceJob *config.CustomNamespace,
region string,
accountID string,
clientCloudwatch cloudwatch.Client,
logger logging.Logger,
) []*model.CloudwatchData {
Expand Down Expand Up @@ -128,10 +124,7 @@ func getMetricDataForQueriesForCustomNamespace(
Statistics: []string{stats},
NilToZero: metric.NilToZero,
AddCloudwatchTimestamp: metric.AddCloudwatchTimestamp,
CustomTags: customNamespaceJob.CustomTags,
Dimensions: cwMetric.Dimensions,
Region: &region,
AccountID: &accountID,
Period: metric.Period,
})
}
Expand Down
17 changes: 4 additions & 13 deletions pkg/job/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func runDiscoveryJob(
logger logging.Logger,
job *config.Job,
region string,
accountID string,
tagsOnMetrics model.ExportedTagsOnMetrics,
clientTag tagging.Client,
clientCloudwatch cloudwatch.Client,
Expand All @@ -55,7 +54,7 @@ func runDiscoveryJob(
}

svc := config.SupportedServices.GetService(job.Type)
getMetricDatas := getMetricDataForQueries(ctx, logger, job, svc, region, accountID, tagsOnMetrics, clientCloudwatch, resources)
getMetricDatas := getMetricDataForQueries(ctx, logger, job, svc, tagsOnMetrics, clientCloudwatch, resources)
metricDataLength := len(getMetricDatas)
if metricDataLength == 0 {
logger.Info("No metrics data found")
Expand Down Expand Up @@ -175,8 +174,6 @@ func getMetricDataForQueries(
logger logging.Logger,
discoveryJob *config.Job,
svc *config.ServiceConfig,
region string,
accountID string,
tagsOnMetrics model.ExportedTagsOnMetrics,
clientCloudwatch cloudwatch.Client,
resources []*model.TaggedResource,
Expand Down Expand Up @@ -204,7 +201,7 @@ func getMetricDataForQueries(
}

_, err := clientCloudwatch.ListMetrics(ctx, svc.Namespace, metric, discoveryJob.RecentlyActiveOnly, func(page []*model.Metric) {
data := getFilteredMetricDatas(logger, region, accountID, discoveryJob.Type, discoveryJob.CustomTags, tagsOnMetrics, page, discoveryJob.DimensionNameRequirements, metric, assoc)
data := getFilteredMetricDatas(logger, discoveryJob.Type, tagsOnMetrics, page, discoveryJob.DimensionNameRequirements, metric, assoc)

mux.Lock()
getMetricDatas = append(getMetricDatas, data...)
Expand Down Expand Up @@ -234,7 +231,7 @@ func getMetricDataForQueries(
return
}

data := getFilteredMetricDatas(logger, region, accountID, discoveryJob.Type, discoveryJob.CustomTags, tagsOnMetrics, metricsList, discoveryJob.DimensionNameRequirements, metric, assoc)
data := getFilteredMetricDatas(logger, discoveryJob.Type, tagsOnMetrics, metricsList, discoveryJob.DimensionNameRequirements, metric, assoc)

mux.Lock()
getMetricDatas = append(getMetricDatas, data...)
Expand All @@ -249,10 +246,7 @@ func getMetricDataForQueries(

func getFilteredMetricDatas(
logger logging.Logger,
region string,
accountID string,
namespace string,
customTags []model.Tag,
tagsOnMetrics model.ExportedTagsOnMetrics,
metricsList []*model.Metric,
dimensionNameList []string,
Expand Down Expand Up @@ -284,8 +278,8 @@ func getFilteredMetricDatas(
Namespace: namespace,
}
}
metricTags := resource.MetricTags(tagsOnMetrics)

metricTags := resource.MetricTags(tagsOnMetrics)
for _, stats := range m.Statistics {
id := fmt.Sprintf("id_%d", rand.Int())

Expand All @@ -298,10 +292,7 @@ func getFilteredMetricDatas(
NilToZero: m.NilToZero,
AddCloudwatchTimestamp: m.AddCloudwatchTimestamp,
Tags: metricTags,
CustomTags: customTags,
Dimensions: cwMetric.Dimensions,
Region: &region,
AccountID: &accountID,
Period: m.Period,
})
}
Expand Down
18 changes: 1 addition & 17 deletions pkg/job/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
},
[]model.CloudwatchData{
{
AccountID: aws.String("123123123123"),
AddCloudwatchTimestamp: aws.Bool(false),
Dimensions: []*model.Dimension{
{
Expand All @@ -108,7 +107,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
Namespace: aws.String("efs"),
NilToZero: aws.Bool(false),
Period: 60,
Region: aws.String("us-east-1"),
Statistics: []string{
"Average",
},
Expand Down Expand Up @@ -178,7 +176,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
},
[]model.CloudwatchData{
{
AccountID: aws.String("123123123123"),
AddCloudwatchTimestamp: aws.Bool(false),
Dimensions: []*model.Dimension{
{
Expand All @@ -191,7 +188,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
Namespace: aws.String("ec2"),
NilToZero: aws.Bool(false),
Period: 60,
Region: aws.String("us-east-1"),
Statistics: []string{
"Average",
},
Expand Down Expand Up @@ -261,7 +257,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
},
[]model.CloudwatchData{
{
AccountID: aws.String("123123123123"),
AddCloudwatchTimestamp: aws.Bool(false),
Dimensions: []*model.Dimension{
{
Expand All @@ -274,7 +269,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
Namespace: aws.String("kafka"),
NilToZero: aws.Bool(false),
Period: 60,
Region: aws.String("us-east-1"),
Statistics: []string{
"Average",
},
Expand Down Expand Up @@ -386,7 +380,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
},
[]model.CloudwatchData{
{
AccountID: aws.String("123123123123"),
AddCloudwatchTimestamp: aws.Bool(false),
Dimensions: []*model.Dimension{
{
Expand All @@ -403,7 +396,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
Namespace: aws.String("alb"),
NilToZero: aws.Bool(false),
Period: 60,
Region: aws.String("us-east-1"),
Statistics: []string{
"Sum",
},
Expand All @@ -415,14 +407,11 @@ func Test_getFilteredMetricDatas(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assoc := associator.NewAssociator(tt.args.dimensionRegexps, tt.args.resources)
metricDatas := getFilteredMetricDatas(logging.NewNopLogger(), tt.args.region, tt.args.accountID, tt.args.namespace, tt.args.customTags, tt.args.tagsOnMetrics, tt.args.metricsList, tt.args.dimensionNameRequirements, tt.args.m, assoc)
metricDatas := getFilteredMetricDatas(logging.NewNopLogger(), tt.args.namespace, tt.args.tagsOnMetrics, tt.args.metricsList, tt.args.dimensionNameRequirements, tt.args.m, assoc)
if len(metricDatas) != len(tt.wantGetMetricsData) {
t.Errorf("len(getFilteredMetricDatas()) = %v, want %v", len(metricDatas), len(tt.wantGetMetricsData))
}
for i, got := range metricDatas {
if *got.AccountID != *tt.wantGetMetricsData[i].AccountID {
t.Errorf("getFilteredMetricDatas().AccountId = %v, want %v", *got.AccountID, *tt.wantGetMetricsData[i].AccountID)
}
if *got.ID != *tt.wantGetMetricsData[i].ID {
t.Errorf("getFilteredMetricDatas().ID = %v, want %v", *got.ID, *tt.wantGetMetricsData[i].ID)
}
Expand All @@ -447,9 +436,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {
if !reflect.DeepEqual(got.Statistics, tt.wantGetMetricsData[i].Statistics) {
t.Errorf("getFilteredMetricDatas().Statistics = %+v, want %+v", got.Statistics, tt.wantGetMetricsData[i].Statistics)
}
if *got.Region != *tt.wantGetMetricsData[i].Region {
t.Errorf("getFilteredMetricDatas().Region = %v, want %v", *got.Region, *tt.wantGetMetricsData[i].Region)
}
if !reflect.DeepEqual(got.Tags, tt.wantGetMetricsData[i].Tags) {
t.Errorf("getFilteredMetricDatas().Tags = %+v, want %+v", got.Tags, tt.wantGetMetricsData[i].Tags)
}
Expand All @@ -460,7 +446,6 @@ func Test_getFilteredMetricDatas(t *testing.T) {

func getSampleMetricDatas(id string) *model.CloudwatchData {
return &model.CloudwatchData{
AccountID: aws.String("123123123123"),
AddCloudwatchTimestamp: aws.Bool(false),
Dimensions: []*model.Dimension{
{
Expand All @@ -478,7 +463,6 @@ func getSampleMetricDatas(id string) *model.CloudwatchData {
Namespace: aws.String("efs"),
NilToZero: aws.Bool(false),
Period: 60,
Region: aws.String("us-east-1"),
Statistics: []string{
"Average",
},
Expand Down
46 changes: 34 additions & 12 deletions pkg/job/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ func ScrapeAwsData(
metricsPerQuery int,
cloudWatchAPIConcurrency int,
taggingAPIConcurrency int,
) ([]*model.TaggedResource, []*model.CloudwatchData) {
) ([][]*model.TaggedResource, []model.CloudwatchMetricResult) {
mux := &sync.Mutex{}
cwData := make([]*model.CloudwatchData, 0)
awsInfoData := make([]*model.TaggedResource, 0)
cwData := make([]model.CloudwatchMetricResult, 0)
awsInfoData := make([][]*model.TaggedResource, 0)
var wg sync.WaitGroup

for _, discoveryJob := range cfg.Discovery.Jobs {
Expand All @@ -38,16 +38,24 @@ func ScrapeAwsData(
}
jobLogger = jobLogger.With("account", accountID)

resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, accountID, cfg.Discovery.ExportedTagsOnMetrics, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery, cloudWatchAPIConcurrency)
resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, cfg.Discovery.ExportedTagsOnMetrics, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery, cloudWatchAPIConcurrency)
metricResult := model.CloudwatchMetricResult{
Context: &model.JobContext{
Region: region,
AccountID: accountID,
CustomTags: discoveryJob.CustomTags,
},
Data: metrics,
}

addDataToOutput := len(metrics) != 0
if config.FlagsFromCtx(ctx).IsFeatureEnabled(config.AlwaysReturnInfoMetrics) {
addDataToOutput = addDataToOutput || len(resources) != 0
}
if addDataToOutput {
mux.Lock()
awsInfoData = append(awsInfoData, resources...)
cwData = append(cwData, metrics...)
awsInfoData = append(awsInfoData, resources)
cwData = append(cwData, metricResult)
mux.Unlock()
}
}(discoveryJob, region, role)
Expand All @@ -69,10 +77,17 @@ func ScrapeAwsData(
}
jobLogger = jobLogger.With("account", accountID)

metrics := runStaticJob(ctx, jobLogger, staticJob, region, accountID, factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency))

metrics := runStaticJob(ctx, jobLogger, staticJob, factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency))
metricResult := model.CloudwatchMetricResult{
Context: &model.JobContext{
Region: region,
AccountID: accountID,
CustomTags: staticJob.CustomTags,
},
Data: metrics,
}
mux.Lock()
cwData = append(cwData, metrics...)
cwData = append(cwData, metricResult)
mux.Unlock()
}(staticJob, region, role)
}
Expand All @@ -95,10 +110,17 @@ func ScrapeAwsData(
}
jobLogger = jobLogger.With("account", accountID)

metrics := runCustomNamespaceJob(ctx, jobLogger, customNamespaceJob, region, accountID, factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery)

metrics := runCustomNamespaceJob(ctx, jobLogger, customNamespaceJob, factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery)
metricResult := model.CloudwatchMetricResult{
Context: &model.JobContext{
Region: region,
AccountID: accountID,
CustomTags: customNamespaceJob.CustomTags,
},
Data: metrics,
}
mux.Lock()
cwData = append(cwData, metrics...)
cwData = append(cwData, metricResult)
mux.Unlock()
}(customNamespaceJob, region, role)
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/job/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ func runStaticJob(
ctx context.Context,
logger logging.Logger,
resource *config.Static,
region string,
accountID string,
clientCloudwatch cloudwatch.Client,
) []*model.CloudwatchData {
cw := []*model.CloudwatchData{}
Expand All @@ -36,10 +34,7 @@ func runStaticJob(
Statistics: metric.Statistics,
NilToZero: metric.NilToZero,
AddCloudwatchTimestamp: metric.AddCloudwatchTimestamp,
CustomTags: resource.CustomTags,
Dimensions: createStaticDimensions(resource.Dimensions),
Region: &region,
AccountID: &accountID,
}

data.Points = clientCloudwatch.GetMetricStatistics(ctx, logger, data.Dimensions, resource.Namespace, metric)
Expand Down
14 changes: 11 additions & 3 deletions pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ type Datapoint struct {
Timestamp *time.Time
}

type CloudwatchMetricResult struct {
Context *JobContext
Data []*CloudwatchData
}

type JobContext struct {
Region string
AccountID string
CustomTags []Tag
}

// CloudwatchData is an internal representation of a CloudWatch
// metric with attached data points, metric and resource information.
type CloudwatchData struct {
Expand All @@ -70,11 +81,8 @@ type CloudwatchData struct {
GetMetricDataTimestamps time.Time
NilToZero *bool
AddCloudwatchTimestamp *bool
CustomTags []Tag
Tags []Tag
Dimensions []*Dimension
Region *string
AccountID *string
Period int64
}

Expand Down
Loading

0 comments on commit 36e4d8d

Please sign in to comment.