Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new concurrency limit mechanism for the CloudWatch client #1107

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
**Bugfixes and features**

Features:
* ...
* Add new CloudWatch API concurrency limiter by @thepalbi

Bugs:
* ...
Expand Down
36 changes: 33 additions & 3 deletions cmd/yace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"golang.org/x/sync/semaphore"

exporter "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
v1 "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/v1"
v2 "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/v2"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
Expand Down Expand Up @@ -46,7 +47,7 @@ var (
debug bool
logFormat string
fips bool
cloudwatchConcurrency int
cloudwatchConcurrency cloudwatch.ConcurrencyConfig
tagConcurrency int
scrapingInterval int
metricsPerQuery int
Expand Down Expand Up @@ -126,9 +127,33 @@ func NewYACEApp() *cli.App {
},
&cli.IntFlag{
Name: "cloudwatch-concurrency",
Value: exporter.DefaultCloudWatchAPIConcurrency,
Value: exporter.DefaultCloudwatchConcurrency.SingleLimit,
Usage: "Maximum number of concurrent requests to CloudWatch API.",
Destination: &cloudwatchConcurrency,
Destination: &cloudwatchConcurrency.SingleLimit,
},
&cli.BoolFlag{
Name: "cloudwatch-concurrency.per-api-enabled",
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
Value: exporter.DefaultCloudwatchConcurrency.PerAPIEnabled,
Usage: "Whether to enable the per API CloudWatch concurrency limiter. When enabled, the concurrency `-cloudwatch-concurrency` flag will be ignored.",
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
Destination: &cloudwatchConcurrency.PerAPIEnabled,
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
},
&cli.IntFlag{
Name: "cloudwatch-concurrency.list-metrics-limit",
Value: exporter.DefaultCloudwatchConcurrency.ListMetrics,
Usage: "Maximum number of concurrent requests to ListMetrics CloudWatch API. Used if the per-api concurrency limiter is enabled.",
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
Destination: &cloudwatchConcurrency.ListMetrics,
},
&cli.IntFlag{
Name: "cloudwatch-concurrency.get-metric-data-limit",
Value: exporter.DefaultCloudwatchConcurrency.GetMetricData,
Usage: "Maximum number of concurrent requests to GetMetricData CloudWatch API. Used if the per-api concurrency limiter is enabled.",
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
Destination: &cloudwatchConcurrency.GetMetricData,
},
&cli.IntFlag{
Name: "cloudwatch-concurrency.get-metric-statistics-limit",
Value: exporter.DefaultCloudwatchConcurrency.GetMetricStatistics,
Usage: "Maximum number of concurrent requests to GetMetricStatistics CloudWatch API. Used if the per-api concurrency limiter is enabled.",
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
Destination: &cloudwatchConcurrency.GetMetricStatistics,
},
&cli.IntFlag{
Name: "tag-concurrency",
Expand Down Expand Up @@ -208,6 +233,11 @@ func NewYACEApp() *cli.App {
func startScraper(c *cli.Context) error {
logger = logging.NewLogger(logFormat, debug, "version", version)

// log warning if the two concurrency limiting methods are configured via CLI
if c.IsSet("cloudwatch-concurrency") && c.IsSet("cloudwatch-concurrency.per-api-enabled") {
logger.Warn("Both `cloudwatch-concurrency` and `cloudwatch-concurrency.per-api-enabled` are set. `cloudwatch-concurrency` will be ignored, and the per-api concurrency limiting strategy will be favoured.")
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
}

logger.Info("Parsing config")
if err := cfg.Load(configFile, logger); err != nil {
return fmt.Errorf("Couldn't read %s: %w", configFile, err)
Expand Down
20 changes: 15 additions & 5 deletions cmd/yace/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,27 @@ func (s *scraper) scrape(ctx context.Context, logger logging.Logger, cache cachi
// clearing, so we always clear credentials before the next scrape
cache.Refresh()
defer cache.Clear()

options := []exporter.OptionsFunc{
exporter.MetricsPerQuery(metricsPerQuery),
exporter.LabelsSnakeCase(labelsSnakeCase),
exporter.EnableFeatureFlag(s.featureFlags...),
exporter.TaggingAPIConcurrency(tagConcurrency),
}

if cloudwatchConcurrency.PerAPIEnabled {
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
options = append(options, exporter.CloudWatchPerAPIConcurrency(cloudwatchConcurrency.ListMetrics, cloudwatchConcurrency.GetMetricData, cloudwatchConcurrency.GetMetricStatistics))
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
} else {
options = append(options, exporter.CloudWatchAPIConcurrency(cloudwatchConcurrency.SingleLimit))
}

err := exporter.UpdateMetrics(
ctx,
logger,
cfg,
newRegistry,
cache,
exporter.MetricsPerQuery(metricsPerQuery),
exporter.LabelsSnakeCase(labelsSnakeCase),
exporter.EnableFeatureFlag(s.featureFlags...),
exporter.CloudWatchAPIConcurrency(cloudwatchConcurrency),
exporter.TaggingAPIConcurrency(tagConcurrency),
options...,
)
if err != nil {
logger.Error(err, "error updating metrics")
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ All flags may be prefixed with either one hypen or two (i.e., both `-config.file
| `-debug` | Log at debug level | `false` |
| `-fips` | Use FIPS compliant AWS API | `false` |
| `-cloudwatch-concurrency` | Maximum number of concurrent requests to CloudWatch API | `5` |
| `-cloudwatch-concurrency.per-api-enabled` | Enables a concurrency limiter, that has a specific limit per CloudWatch API call. | `false` |
| `-cloudwatch-concurrency.list-metrics-limit` | Maximum number of concurrent requests to CloudWatch `ListMetrics` API. Only applicable if `per-api-enabled` is `true`. | `5` |
| `-cloudwatch-concurrency.get-metric-data-limit` | Maximum number of concurrent requests to CloudWatch `GetMetricsData` API. Only applicable if `per-api-enabled` is `true`. | `5` |
| `-cloudwatch-concurrency.get-metric-statistics-limit` | Maximum number of concurrent requests to CloudWatch `GetMetricStatistics` API. Only applicable if `per-api-enabled` is `true`. | `5` |
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
| `-tag-concurrency` | Maximum number of concurrent requests to Resource Tagging API | `5` |
| `-scraping-interval` | Seconds to wait between scraping the AWS metrics | `300` |
| `-metrics-per-query` | Number of metrics made in a single GetMetricsData request | `500` |
Expand Down
42 changes: 31 additions & 11 deletions pkg/clients/cloudwatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

const (
listMetricsCall = "ListMetrics"
getMetricDataCall = "GetMetricData"
getMetricStatisticsCall = "GetMetricStatistics"
)

type Client interface {
// ListMetrics returns the list of metrics and dimensions for a given namespace
// and metric name. Results pagination is handled automatically: the caller can
Expand All @@ -23,41 +29,55 @@ type Client interface {
GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint
}

// ConcurrencyLimiter limits the concurrency when calling AWS CloudWatch APIs. The functions implemented
// by this interface follow the same as a normal semaphore, but accept and operation identifier. Some
// implementations might use this to keep a different semaphore, with different reentrance values, per
// operation.
type ConcurrencyLimiter interface {
// Acquire takes one "ticket" from the concurrency limiter for op. If there's none available, the caller
// routine will be blocked until there's room available.
Acquire(op string)

// Release gives back one "ticket" to the concurrency limiter identified by op. If there's one or more
// routines waiting for one, one will be woken up.
Release(op string)
}

type MetricDataResult struct {
ID string
Datapoint float64
Timestamp time.Time
}

type limitedConcurrencyClient struct {
client Client
sem chan struct{}
client Client
limiter ConcurrencyLimiter
}

func NewLimitedConcurrencyClient(client Client, maxConcurrency int) Client {
func NewLimitedConcurrencyClient(client Client, limiter ConcurrencyLimiter) Client {
return &limitedConcurrencyClient{
client: client,
sem: make(chan struct{}, maxConcurrency),
client: client,
limiter: limiter,
}
}

func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint {
c.sem <- struct{}{}
c.limiter.Acquire(getMetricStatisticsCall)
res := c.client.GetMetricStatistics(ctx, logger, dimensions, namespace, metric)
<-c.sem
c.limiter.Release(getMetricStatisticsCall)
return res
}

func (c limitedConcurrencyClient) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []MetricDataResult {
c.sem <- struct{}{}
c.limiter.Acquire(getMetricDataCall)
res := c.client.GetMetricData(ctx, logger, getMetricData, namespace, length, delay, configuredRoundingPeriod)
<-c.sem
c.limiter.Release(getMetricDataCall)
return res
}

func (c limitedConcurrencyClient) ListMetrics(ctx context.Context, namespace string, metric *config.Metric, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
c.sem <- struct{}{}
c.limiter.Acquire(listMetricsCall)
res, err := c.client.ListMetrics(ctx, namespace, metric, recentlyActiveOnly, fn)
<-c.sem
c.limiter.Release(listMetricsCall)
return res, err
}
105 changes: 105 additions & 0 deletions pkg/clients/cloudwatch/concurrency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package cloudwatch

// ConcurrencyConfig configures how concurrency should be limited in a Cloudwatch API client. It allows
// one to pick between different limiter implementations: a single limit limiter, or one with a different limit per
// API call.
type ConcurrencyConfig struct {
// PerAPIEnabled configures wether to have a limit per API call.
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
PerAPIEnabled bool
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

// SingleLimit configures the concurrency limit when using a single limiter for api calls.
SingleLimit int

// ListMetrics limits the number for ListMetrics API concurrent API calls.
ListMetrics int

// GetMetricData limits the number for GetMetricData API concurrent API calls.
GetMetricData int

// GetMetricStatistics limits the number for GetMetricStatistics API concurrent API calls.
GetMetricStatistics int
}

// semaphore implements a simple semaphore using a channel.
type semaphore chan struct{}

// newSemaphore creates a new semaphore with the given limit.
func newSemaphore(limit int) semaphore {
return make(semaphore, limit)
}

func (s semaphore) Acquire() {
s <- struct{}{}
}

func (s semaphore) Release() {
<-s
}

// NewLimiter creates a new ConcurrencyLimiter, according to the ConcurrencyConfig.
func (cfg ConcurrencyConfig) NewLimiter() ConcurrencyLimiter {
if cfg.PerAPIEnabled {
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
return NewPerAPICallLimiter(cfg.ListMetrics, cfg.GetMetricData, cfg.GetMetricStatistics)
}
return NewSingleLimiter(cfg.SingleLimit)
}

// perAPICallLimiter is a ConcurrencyLimiter that keeps a different concurrency limiter per different API call. This allows
// a more granular control of concurrency, allowing us to take advantage of different api limits. For example, ListMetrics
// has a limit of 25 TPS, while GetMetricData has none.
type perAPICallLimiter struct {
listMetricsLimiter semaphore
getMetricsDataLimiter semaphore
getMetricsStatisticsLimiter semaphore
}

// NewPerAPICallLimiter creates a new PerAPICallLimiter.
func NewPerAPICallLimiter(listMetrics, getMetricData, getMetricStatistics int) ConcurrencyLimiter {
return &perAPICallLimiter{
listMetricsLimiter: newSemaphore(listMetrics),
getMetricsDataLimiter: newSemaphore(getMetricData),
getMetricsStatisticsLimiter: newSemaphore(getMetricStatistics),
}
}

func (l *perAPICallLimiter) Acquire(op string) {
switch op {
case listMetricsCall:
l.listMetricsLimiter.Acquire()
case getMetricDataCall:
l.getMetricsDataLimiter.Acquire()
case getMetricStatisticsCall:
l.getMetricsStatisticsLimiter.Acquire()
}
}

func (l *perAPICallLimiter) Release(op string) {
switch op {
case listMetricsCall:
l.listMetricsLimiter.Release()
case getMetricDataCall:
l.getMetricsDataLimiter.Release()
case getMetricStatisticsCall:
l.getMetricsStatisticsLimiter.Release()
}
}

// singleLimiter is the current implementation of ConcurrencyLimiter, which has a single limit for all different API calls.
type singleLimiter struct {
s semaphore
}

// NewSingleLimiter creates a new SingleLimiter.
func NewSingleLimiter(limit int) ConcurrencyLimiter {
return &singleLimiter{
s: newSemaphore(limit),
}
}

func (sl *singleLimiter) Acquire(_ string) {
sl.s.Acquire()
}

func (sl *singleLimiter) Release(_ string) {
sl.s.Release()
}
2 changes: 1 addition & 1 deletion pkg/clients/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// Factory is an interface to abstract away all logic required to produce the different
// YACE specific clients which wrap AWS clients
type Factory interface {
GetCloudwatchClient(region string, role config.Role, concurrencyLimit int) cloudwatch_client.Client
GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client
GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client
GetAccountClient(region string, role config.Role) account.Client
}
6 changes: 3 additions & 3 deletions pkg/clients/v1/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,17 @@ func createAccountClient(logger logging.Logger, sts stsiface.STSAPI) account.Cli
return account_v1.NewClient(logger, sts)
}

func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrencyLimit int) cloudwatch_client.Client {
func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client {
if !c.refreshed {
// if we have not refreshed then we need to lock in case we are accessing concurrently
c.mu.Lock()
defer c.mu.Unlock()
}
if client := c.clients[role][region].cloudwatch; client != nil {
return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrencyLimit)
return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrency.NewLimiter())
}
c.clients[role][region].cloudwatch = createCloudWatchClient(c.logger, c.session, &region, role, c.fips)
return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrencyLimit)
return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrency.NewLimiter())
}

func (c *CachingFactory) GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client {
Expand Down
3 changes: 2 additions & 1 deletion pkg/clients/v1/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go/service/sts/stsiface"
"github.com/stretchr/testify/require"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
)
Expand Down Expand Up @@ -697,7 +698,7 @@ func TestClientCacheGetCloudwatchClient(t *testing.T) {
testGetAWSClient(
t, "Cloudwatch",
func(t *testing.T, cache *CachingFactory, region string, role config.Role) {
iface := cache.GetCloudwatchClient(region, role, 1)
iface := cache.GetCloudwatchClient(region, role, cloudwatch.ConcurrencyConfig{SingleLimit: 1})
if iface == nil {
t.Fail()
return
Expand Down
6 changes: 3 additions & 3 deletions pkg/clients/v2/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,17 @@ func NewFactory(cfg config.ScrapeConf, fips bool, logger logging.Logger) (*Cachi
}, nil
}

func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrencyLimit int) cloudwatch_client.Client {
func (c *CachingFactory) GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client {
if !c.refreshed {
// if we have not refreshed then we need to lock in case we are accessing concurrently
c.mu.Lock()
defer c.mu.Unlock()
}
if client := c.clients[role][region].cloudwatch; client != nil {
return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrencyLimit)
return cloudwatch_client.NewLimitedConcurrencyClient(client, concurrency.NewLimiter())
}
c.clients[role][region].cloudwatch = cloudwatch_v2.NewClient(c.logger, c.createCloudwatchClient(c.clients[role][region].awsConfig))
return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrencyLimit)
return cloudwatch_client.NewLimitedConcurrencyClient(c.clients[role][region].cloudwatch, concurrency.NewLimiter())
}

func (c *CachingFactory) GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client {
Expand Down
4 changes: 2 additions & 2 deletions pkg/clients/v2/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestClientCache_GetCloudwatchClient(t *testing.T) {
clients := output.clients[defaultRole]["region1"]
require.NotNil(t, clients)
// Can't do equality comparison due to concurrency limiter
assert.NotNil(t, output.GetCloudwatchClient("region1", defaultRole, 1))
assert.NotNil(t, output.GetCloudwatchClient("region1", defaultRole, cloudwatch_client.ConcurrencyConfig{SingleLimit: 1}))
})

t.Run("unrefreshed cache creates a new client", func(t *testing.T) {
Expand All @@ -312,7 +312,7 @@ func TestClientCache_GetCloudwatchClient(t *testing.T) {
require.NotNil(t, clients)
require.Nil(t, clients.cloudwatch)

output.GetCloudwatchClient("region1", defaultRole, 1)
output.GetCloudwatchClient("region1", defaultRole, cloudwatch_client.ConcurrencyConfig{SingleLimit: 1})
assert.NotNil(t, clients.cloudwatch)
})
}
Expand Down
Loading