diff --git a/CHANGELOG.md b/CHANGELOG.md index ea9843202..442abc78e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ **Bugfixes and features** Features: -* ... +* Add new CloudWatch API concurrency limiter by @thepalbi Bugs: * ... diff --git a/cmd/yace/main.go b/cmd/yace/main.go index ea31f997b..e38e8b073 100644 --- a/cmd/yace/main.go +++ b/cmd/yace/main.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/semaphore" exporter "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" v1 "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/v1" v2 "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/v2" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" @@ -46,7 +47,7 @@ var ( debug bool logFormat string fips bool - cloudwatchConcurrency int + cloudwatchConcurrency cloudwatch.ConcurrencyConfig tagConcurrency int scrapingInterval int metricsPerQuery int @@ -126,9 +127,33 @@ func NewYACEApp() *cli.App { }, &cli.IntFlag{ Name: "cloudwatch-concurrency", - Value: exporter.DefaultCloudWatchAPIConcurrency, + Value: exporter.DefaultCloudwatchConcurrency.SingleLimit, Usage: "Maximum number of concurrent requests to CloudWatch API.", - Destination: &cloudwatchConcurrency, + Destination: &cloudwatchConcurrency.SingleLimit, + }, + &cli.BoolFlag{ + Name: "cloudwatch-concurrency.per-api-limit-enabled", + Value: exporter.DefaultCloudwatchConcurrency.PerAPILimitEnabled, + Usage: "Whether to enable the per API CloudWatch concurrency limiter. When enabled, the concurrency `-cloudwatch-concurrency` flag will be ignored.", + Destination: &cloudwatchConcurrency.PerAPILimitEnabled, + }, + &cli.IntFlag{ + Name: "cloudwatch-concurrency.list-metrics-limit", + Value: exporter.DefaultCloudwatchConcurrency.ListMetrics, + Usage: "Maximum number of concurrent requests to ListMetrics CloudWatch API. Used if the -cloudwatch-concurrency.per-api-limit-enabled concurrency limiter is enabled.", + Destination: &cloudwatchConcurrency.ListMetrics, + }, + &cli.IntFlag{ + Name: "cloudwatch-concurrency.get-metric-data-limit", + Value: exporter.DefaultCloudwatchConcurrency.GetMetricData, + Usage: "Maximum number of concurrent requests to GetMetricData CloudWatch API. Used if the -cloudwatch-concurrency.per-api-limit-enabled concurrency limiter is enabled.", + Destination: &cloudwatchConcurrency.GetMetricData, + }, + &cli.IntFlag{ + Name: "cloudwatch-concurrency.get-metric-statistics-limit", + Value: exporter.DefaultCloudwatchConcurrency.GetMetricStatistics, + Usage: "Maximum number of concurrent requests to GetMetricStatistics CloudWatch API. Used if the -cloudwatch-concurrency.per-api-limit-enabled concurrency limiter is enabled.", + Destination: &cloudwatchConcurrency.GetMetricStatistics, }, &cli.IntFlag{ Name: "tag-concurrency", @@ -208,6 +233,11 @@ func NewYACEApp() *cli.App { func startScraper(c *cli.Context) error { logger = logging.NewLogger(logFormat, debug, "version", version) + // log warning if the two concurrency limiting methods are configured via CLI + if c.IsSet("cloudwatch-concurrency") && c.IsSet("cloudwatch-concurrency.per-api-limit-enabled") { + logger.Warn("Both `cloudwatch-concurrency` and `cloudwatch-concurrency.per-api-limit-enabled` are set. `cloudwatch-concurrency` will be ignored, and the per-api concurrency limiting strategy will be favoured.") + } + logger.Info("Parsing config") if err := cfg.Load(configFile, logger); err != nil { return fmt.Errorf("Couldn't read %s: %w", configFile, err) diff --git a/cmd/yace/scraper.go b/cmd/yace/scraper.go index 1c68d07e7..c572650a8 100644 --- a/cmd/yace/scraper.go +++ b/cmd/yace/scraper.go @@ -81,17 +81,27 @@ func (s *scraper) scrape(ctx context.Context, logger logging.Logger, cache cachi // clearing, so we always clear credentials before the next scrape cache.Refresh() defer cache.Clear() + + options := []exporter.OptionsFunc{ + exporter.MetricsPerQuery(metricsPerQuery), + exporter.LabelsSnakeCase(labelsSnakeCase), + exporter.EnableFeatureFlag(s.featureFlags...), + exporter.TaggingAPIConcurrency(tagConcurrency), + } + + if cloudwatchConcurrency.PerAPILimitEnabled { + options = append(options, exporter.CloudWatchPerAPILimitConcurrency(cloudwatchConcurrency.ListMetrics, cloudwatchConcurrency.GetMetricData, cloudwatchConcurrency.GetMetricStatistics)) + } else { + options = append(options, exporter.CloudWatchAPIConcurrency(cloudwatchConcurrency.SingleLimit)) + } + err := exporter.UpdateMetrics( ctx, logger, cfg, newRegistry, cache, - exporter.MetricsPerQuery(metricsPerQuery), - exporter.LabelsSnakeCase(labelsSnakeCase), - exporter.EnableFeatureFlag(s.featureFlags...), - exporter.CloudWatchAPIConcurrency(cloudwatchConcurrency), - exporter.TaggingAPIConcurrency(tagConcurrency), + options..., ) if err != nil { logger.Error(err, "error updating metrics") diff --git a/docs/configuration.md b/docs/configuration.md index fbf3591e5..1a19b2cb2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -21,6 +21,10 @@ All flags may be prefixed with either one hypen or two (i.e., both `-config.file | `-debug` | Log at debug level | `false` | | `-fips` | Use FIPS compliant AWS API | `false` | | `-cloudwatch-concurrency` | Maximum number of concurrent requests to CloudWatch API | `5` | +| `-cloudwatch-concurrency.per-api-limit-enabled` | Enables a concurrency limiter, that has a specific limit per CloudWatch API call. | `false` | +| `-cloudwatch-concurrency.list-metrics-limit` | Maximum number of concurrent requests to CloudWatch `ListMetrics` API. Only applicable if `per-api-limit-enabled` is `true`. | `5` | +| `-cloudwatch-concurrency.get-metric-data-limit` | Maximum number of concurrent requests to CloudWatch `GetMetricsData` API. Only applicable if `per-api-limit-enabled` is `true`. | `5` | +| `-cloudwatch-concurrency.get-metric-statistics-limit` | Maximum number of concurrent requests to CloudWatch `GetMetricStatistics` API. Only applicable if `per-api-limit-enabled` is `true`. | `5` | | `-tag-concurrency` | Maximum number of concurrent requests to Resource Tagging API | `5` | | `-scraping-interval` | Seconds to wait between scraping the AWS metrics | `300` | | `-metrics-per-query` | Number of metrics made in a single GetMetricsData request | `500` | diff --git a/pkg/clients/cloudwatch/client.go b/pkg/clients/cloudwatch/client.go index 0af7330ed..096453ff2 100644 --- a/pkg/clients/cloudwatch/client.go +++ b/pkg/clients/cloudwatch/client.go @@ -9,6 +9,12 @@ import ( "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" ) +const ( + listMetricsCall = "ListMetrics" + getMetricDataCall = "GetMetricData" + getMetricStatisticsCall = "GetMetricStatistics" +) + type Client interface { // ListMetrics returns the list of metrics and dimensions for a given namespace // and metric name. Results pagination is handled automatically: the caller can @@ -23,6 +29,20 @@ type Client interface { GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint } +// ConcurrencyLimiter limits the concurrency when calling AWS CloudWatch APIs. The functions implemented +// by this interface follow the same as a normal semaphore, but accept and operation identifier. Some +// implementations might use this to keep a different semaphore, with different reentrance values, per +// operation. +type ConcurrencyLimiter interface { + // Acquire takes one "ticket" from the concurrency limiter for op. If there's none available, the caller + // routine will be blocked until there's room available. + Acquire(op string) + + // Release gives back one "ticket" to the concurrency limiter identified by op. If there's one or more + // routines waiting for one, one will be woken up. + Release(op string) +} + type MetricDataResult struct { ID string Datapoint float64 @@ -30,34 +50,34 @@ type MetricDataResult struct { } type limitedConcurrencyClient struct { - client Client - sem chan struct{} + client Client + limiter ConcurrencyLimiter } -func NewLimitedConcurrencyClient(client Client, maxConcurrency int) Client { +func NewLimitedConcurrencyClient(client Client, limiter ConcurrencyLimiter) Client { return &limitedConcurrencyClient{ - client: client, - sem: make(chan struct{}, maxConcurrency), + client: client, + limiter: limiter, } } func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint { - c.sem <- struct{}{} + c.limiter.Acquire(getMetricStatisticsCall) res := c.client.GetMetricStatistics(ctx, logger, dimensions, namespace, metric) - <-c.sem + c.limiter.Release(getMetricStatisticsCall) return res } func (c limitedConcurrencyClient) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []MetricDataResult { - c.sem <- struct{}{} + c.limiter.Acquire(getMetricDataCall) res := c.client.GetMetricData(ctx, logger, getMetricData, namespace, length, delay, configuredRoundingPeriod) - <-c.sem + c.limiter.Release(getMetricDataCall) return res } func (c limitedConcurrencyClient) ListMetrics(ctx context.Context, namespace string, metric *config.Metric, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) { - c.sem <- struct{}{} + c.limiter.Acquire(listMetricsCall) res, err := c.client.ListMetrics(ctx, namespace, metric, recentlyActiveOnly, fn) - <-c.sem + c.limiter.Release(listMetricsCall) return res, err } diff --git a/pkg/clients/cloudwatch/concurrency.go b/pkg/clients/cloudwatch/concurrency.go new file mode 100644 index 000000000..fbac3a94d --- /dev/null +++ b/pkg/clients/cloudwatch/concurrency.go @@ -0,0 +1,105 @@ +package cloudwatch + +// ConcurrencyConfig configures how concurrency should be limited in a Cloudwatch API client. It allows +// one to pick between different limiter implementations: a single limit limiter, or one with a different limit per +// API call. +type ConcurrencyConfig struct { + // PerAPIEnabled configures whether to have a limit per API call. + PerAPILimitEnabled bool + + // SingleLimit configures the concurrency limit when using a single limiter for api calls. + SingleLimit int + + // ListMetrics limits the number for ListMetrics API concurrent API calls. + ListMetrics int + + // GetMetricData limits the number for GetMetricData API concurrent API calls. + GetMetricData int + + // GetMetricStatistics limits the number for GetMetricStatistics API concurrent API calls. + GetMetricStatistics int +} + +// semaphore implements a simple semaphore using a channel. +type semaphore chan struct{} + +// newSemaphore creates a new semaphore with the given limit. +func newSemaphore(limit int) semaphore { + return make(semaphore, limit) +} + +func (s semaphore) Acquire() { + s <- struct{}{} +} + +func (s semaphore) Release() { + <-s +} + +// NewLimiter creates a new ConcurrencyLimiter, according to the ConcurrencyConfig. +func (cfg ConcurrencyConfig) NewLimiter() ConcurrencyLimiter { + if cfg.PerAPILimitEnabled { + return NewPerAPICallLimiter(cfg.ListMetrics, cfg.GetMetricData, cfg.GetMetricStatistics) + } + return NewSingleLimiter(cfg.SingleLimit) +} + +// perAPICallLimiter is a ConcurrencyLimiter that keeps a different concurrency limiter per different API call. This allows +// a more granular control of concurrency, allowing us to take advantage of different api limits. For example, ListMetrics +// has a limit of 25 TPS, while GetMetricData has none. +type perAPICallLimiter struct { + listMetricsLimiter semaphore + getMetricsDataLimiter semaphore + getMetricsStatisticsLimiter semaphore +} + +// NewPerAPICallLimiter creates a new PerAPICallLimiter. +func NewPerAPICallLimiter(listMetrics, getMetricData, getMetricStatistics int) ConcurrencyLimiter { + return &perAPICallLimiter{ + listMetricsLimiter: newSemaphore(listMetrics), + getMetricsDataLimiter: newSemaphore(getMetricData), + getMetricsStatisticsLimiter: newSemaphore(getMetricStatistics), + } +} + +func (l *perAPICallLimiter) Acquire(op string) { + switch op { + case listMetricsCall: + l.listMetricsLimiter.Acquire() + case getMetricDataCall: + l.getMetricsDataLimiter.Acquire() + case getMetricStatisticsCall: + l.getMetricsStatisticsLimiter.Acquire() + } +} + +func (l *perAPICallLimiter) Release(op string) { + switch op { + case listMetricsCall: + l.listMetricsLimiter.Release() + case getMetricDataCall: + l.getMetricsDataLimiter.Release() + case getMetricStatisticsCall: + l.getMetricsStatisticsLimiter.Release() + } +} + +// singleLimiter is the current implementation of ConcurrencyLimiter, which has a single limit for all different API calls. +type singleLimiter struct { + s semaphore +} + +// NewSingleLimiter creates a new SingleLimiter. +func NewSingleLimiter(limit int) ConcurrencyLimiter { + return &singleLimiter{ + s: newSemaphore(limit), + } +} + +func (sl *singleLimiter) Acquire(_ string) { + sl.s.Acquire() +} + +func (sl *singleLimiter) Release(_ string) { + sl.s.Release() +} diff --git a/pkg/clients/factory.go b/pkg/clients/factory.go index d7080e4eb..bfa1b938a 100644 --- a/pkg/clients/factory.go +++ b/pkg/clients/factory.go @@ -10,7 +10,7 @@ import ( // Factory is an interface to abstract away all logic required to produce the different // YACE specific clients which wrap AWS clients type Factory interface { - GetCloudwatchClient(region string, role config.Role, concurrencyLimit int) cloudwatch_client.Client + GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client GetAccountClient(region string, role config.Role) account.Client } diff --git a/pkg/clients/v1/factory.go b/pkg/clients/v1/factory.go index 8c4800a55..0993bef19 100644 --- a/pkg/clients/v1/factory.go +++ b/pkg/clients/v1/factory.go @@ -246,17 +246,17 @@ func createAccountClient(logger logging.Logger, sts stsiface.STSAPI) account.Cli return account_v1.NewClient(logger, sts) } -func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrencyLimit int) cloudwatch_client.Client { +func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client { if !c.refreshed { // if we have not refreshed then we need to lock in case we are accessing concurrently c.mu.Lock() defer c.mu.Unlock() } if client := c.clients[role][region].cloudwatch; client != nil { - return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrencyLimit) + return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrency.NewLimiter()) } c.clients[role][region].cloudwatch = createCloudWatchClient(c.logger, c.session, ®ion, role, c.fips) - return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrencyLimit) + return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrency.NewLimiter()) } func (c *CachingFactory) GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client { diff --git a/pkg/clients/v1/factory_test.go b/pkg/clients/v1/factory_test.go index 37997eccc..fdaeeec26 100644 --- a/pkg/clients/v1/factory_test.go +++ b/pkg/clients/v1/factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go/service/sts/stsiface" "github.com/stretchr/testify/require" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" ) @@ -697,7 +698,7 @@ func TestClientCacheGetCloudwatchClient(t *testing.T) { testGetAWSClient( t, "Cloudwatch", func(t *testing.T, cache *CachingFactory, region string, role config.Role) { - iface := cache.GetCloudwatchClient(region, role, 1) + iface := cache.GetCloudwatchClient(region, role, cloudwatch.ConcurrencyConfig{SingleLimit: 1}) if iface == nil { t.Fail() return diff --git a/pkg/clients/v2/factory.go b/pkg/clients/v2/factory.go index 1c71faa5f..169435304 100644 --- a/pkg/clients/v2/factory.go +++ b/pkg/clients/v2/factory.go @@ -149,17 +149,17 @@ func NewFactory(cfg config.ScrapeConf, fips bool, logger logging.Logger) (*Cachi }, nil } -func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrencyLimit int) cloudwatch_client.Client { +func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client { if !c.refreshed { // if we have not refreshed then we need to lock in case we are accessing concurrently c.mu.Lock() defer c.mu.Unlock() } if client := c.clients[role][region].cloudwatch; client != nil { - return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrencyLimit) + return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrency.NewLimiter()) } c.clients[role][region].cloudwatch = cloudwatch_v2.NewClient(c.logger, c.createCloudwatchClient(c.clients[role][region].awsConfig)) - return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrencyLimit) + return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrency.NewLimiter()) } func (c *CachingFactory) GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client { diff --git a/pkg/clients/v2/factory_test.go b/pkg/clients/v2/factory_test.go index 88c1d4c18..82516c8fe 100644 --- a/pkg/clients/v2/factory_test.go +++ b/pkg/clients/v2/factory_test.go @@ -289,7 +289,7 @@ func TestClientCache_GetCloudwatchClient(t *testing.T) { clients := output.clients[defaultRole]["region1"] require.NotNil(t, clients) // Can't do equality comparison due to concurrency limiter - assert.NotNil(t, output.GetCloudwatchClient("region1", defaultRole, 1)) + assert.NotNil(t, output.GetCloudwatchClient("region1", defaultRole, cloudwatch_client.ConcurrencyConfig{SingleLimit: 1})) }) t.Run("unrefreshed cache creates a new client", func(t *testing.T) { @@ -312,7 +312,7 @@ func TestClientCache_GetCloudwatchClient(t *testing.T) { require.NotNil(t, clients) require.Nil(t, clients.cloudwatch) - output.GetCloudwatchClient("region1", defaultRole, 1) + output.GetCloudwatchClient("region1", defaultRole, cloudwatch_client.ConcurrencyConfig{SingleLimit: 1}) assert.NotNil(t, clients.cloudwatch) }) } diff --git a/pkg/exporter.go b/pkg/exporter.go index 68a2c84d2..749b88c4e 100644 --- a/pkg/exporter.go +++ b/pkg/exporter.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" @@ -30,22 +31,31 @@ var Metrics = []prometheus.Collector{ } const ( - DefaultMetricsPerQuery = 500 - DefaultLabelsSnakeCase = false - DefaultCloudWatchAPIConcurrency = 5 - DefaultTaggingAPIConcurrency = 5 + DefaultMetricsPerQuery = 500 + DefaultLabelsSnakeCase = false + DefaultTaggingAPIConcurrency = 5 ) +var DefaultCloudwatchConcurrency = cloudwatch.ConcurrencyConfig{ + SingleLimit: 5, + PerAPILimitEnabled: false, + + // If PerAPILimitEnabled is enabled, then use the same limit as the single limit by default. + ListMetrics: 5, + GetMetricData: 5, + GetMetricStatistics: 5, +} + // featureFlagsMap is a map that contains the enabled feature flags. If a key is not present, it means the feature flag // is disabled. type featureFlagsMap map[string]struct{} type options struct { - metricsPerQuery int - labelsSnakeCase bool - cloudWatchAPIConcurrency int - taggingAPIConcurrency int - featureFlags featureFlagsMap + metricsPerQuery int + labelsSnakeCase bool + taggingAPIConcurrency int + featureFlags featureFlagsMap + cloudwatchConcurrency cloudwatch.ConcurrencyConfig } // IsFeatureEnabled implements the FeatureFlags interface, allowing us to inject the options-configure feature flags in the rest of the code. @@ -80,7 +90,27 @@ func CloudWatchAPIConcurrency(maxConcurrency int) OptionsFunc { return fmt.Errorf("CloudWatchAPIConcurrency must be a positive value") } - o.cloudWatchAPIConcurrency = maxConcurrency + o.cloudwatchConcurrency.SingleLimit = maxConcurrency + return nil + } +} + +func CloudWatchPerAPILimitConcurrency(listMetrics, getMetricData, getMetricStatistics int) OptionsFunc { + return func(o *options) error { + if listMetrics <= 0 { + return fmt.Errorf("LitMetrics concurrency limit must be a positive value") + } + if getMetricData <= 0 { + return fmt.Errorf("GetMetricData concurrency limit must be a positive value") + } + if getMetricStatistics <= 0 { + return fmt.Errorf("GetMetricStatistics concurrency limit must be a positive value") + } + + o.cloudwatchConcurrency.PerAPILimitEnabled = true + o.cloudwatchConcurrency.ListMetrics = listMetrics + o.cloudwatchConcurrency.GetMetricData = getMetricData + o.cloudwatchConcurrency.GetMetricStatistics = getMetricStatistics return nil } } @@ -108,11 +138,11 @@ func EnableFeatureFlag(flags ...string) OptionsFunc { func defaultOptions() options { return options{ - metricsPerQuery: DefaultMetricsPerQuery, - labelsSnakeCase: DefaultLabelsSnakeCase, - cloudWatchAPIConcurrency: DefaultCloudWatchAPIConcurrency, - taggingAPIConcurrency: DefaultTaggingAPIConcurrency, - featureFlags: make(featureFlagsMap), + metricsPerQuery: DefaultMetricsPerQuery, + labelsSnakeCase: DefaultLabelsSnakeCase, + taggingAPIConcurrency: DefaultTaggingAPIConcurrency, + featureFlags: make(featureFlagsMap), + cloudwatchConcurrency: DefaultCloudwatchConcurrency, } } @@ -154,7 +184,7 @@ func UpdateMetrics( cfg, factory, options.metricsPerQuery, - options.cloudWatchAPIConcurrency, + options.cloudwatchConcurrency, options.taggingAPIConcurrency, ) diff --git a/pkg/job/discovery.go b/pkg/job/discovery.go index 162f186e9..8ba5957fb 100644 --- a/pkg/job/discovery.go +++ b/pkg/job/discovery.go @@ -33,7 +33,7 @@ func runDiscoveryJob( clientTag tagging.Client, clientCloudwatch cloudwatch.Client, metricsPerQuery int, - concurrencyLimit int, + cloudwatchConcurrency cloudwatch.ConcurrencyConfig, ) ([]*model.TaggedResource, []*model.CloudwatchData) { logger.Debug("Get tagged resources") @@ -67,7 +67,8 @@ func runDiscoveryJob( logger.Debug("GetMetricData partitions", "size", partitionSize) g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(concurrencyLimit) + // TODO: don't know if this is ok, double check, and should work for either per-api and single cl + g.SetLimit(cloudwatchConcurrency.GetMetricData) mu := sync.Mutex{} getMetricDataOutput := make([][]cloudwatch.MetricDataResult, 0, partitionSize) diff --git a/pkg/job/scrape.go b/pkg/job/scrape.go index 773c35613..95881fa03 100644 --- a/pkg/job/scrape.go +++ b/pkg/job/scrape.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" @@ -16,7 +17,7 @@ func ScrapeAwsData( cfg config.ScrapeConf, factory clients.Factory, metricsPerQuery int, - cloudWatchAPIConcurrency int, + cloudwatchConcurrency cloudwatch.ConcurrencyConfig, taggingAPIConcurrency int, ) ([][]*model.TaggedResource, []model.CloudwatchMetricResult) { mux := &sync.Mutex{} @@ -38,7 +39,7 @@ func ScrapeAwsData( } jobLogger = jobLogger.With("account", accountID) - resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, cfg.Discovery.ExportedTagsOnMetrics, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery, cloudWatchAPIConcurrency) + resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, cfg.Discovery.ExportedTagsOnMetrics, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudwatchConcurrency), metricsPerQuery, cloudwatchConcurrency) metricResult := model.CloudwatchMetricResult{ Context: &model.JobContext{ Region: region, @@ -77,7 +78,7 @@ func ScrapeAwsData( } jobLogger = jobLogger.With("account", accountID) - metrics := runStaticJob(ctx, jobLogger, staticJob, factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency)) + metrics := runStaticJob(ctx, jobLogger, staticJob, factory.GetCloudwatchClient(region, role, cloudwatchConcurrency)) metricResult := model.CloudwatchMetricResult{ Context: &model.JobContext{ Region: region, @@ -108,7 +109,7 @@ func ScrapeAwsData( } jobLogger = jobLogger.With("account", accountID) - metrics := runCustomNamespaceJob(ctx, jobLogger, customNamespaceJob, factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery) + metrics := runCustomNamespaceJob(ctx, jobLogger, customNamespaceJob, factory.GetCloudwatchClient(region, role, cloudwatchConcurrency), metricsPerQuery) metricResult := model.CloudwatchMetricResult{ Context: &model.JobContext{ Region: region,