From b8a5d64df415a85b4877f52ec9357682c556ea03 Mon Sep 17 00:00:00 2001 From: geigerj0 <112163019+geigerj0@users.noreply.github.com> Date: Fri, 26 Apr 2024 15:23:52 +0200 Subject: [PATCH] fix(eventgenerator): use PromQL API for metric types `throughput` and `responsetime` (#2890) --- src/acceptance/app/dynamic_policy_test.go | 79 ++++- src/acceptance/helpers/helpers.go | 46 +++ .../envelopeprocessor/envelope_processor.go | 20 +- .../envelopeprocessor_test.go | 104 +------ .../aggregator/metric_poller.go | 2 +- .../eventgenerator/client/log_cache_client.go | 159 +++++++--- .../client/log_cache_client_test.go | 271 +++++++++++++++--- .../client/metric_client_factory.go | 4 +- .../client/metric_client_factory_test.go | 5 +- .../client/metric_server_client.go | 2 +- .../cmd/eventgenerator/eventgenerator_test.go | 4 +- 11 files changed, 477 insertions(+), 219 deletions(-) diff --git a/src/acceptance/app/dynamic_policy_test.go b/src/acceptance/app/dynamic_policy_test.go index 1d2b181997..8d0da7c6c1 100644 --- a/src/acceptance/app/dynamic_policy_test.go +++ b/src/acceptance/app/dynamic_policy_test.go @@ -4,6 +4,7 @@ import ( "acceptance" . "acceptance/helpers" "fmt" + "sync" "time" cfh "github.com/cloudfoundry/cf-test-helpers/v2/helpers" @@ -19,6 +20,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan chan bool ticker *time.Ticker maxHeapLimitMb int + wg sync.WaitGroup ) const minimalMemoryUsage = 28 // observed minimal memory usage by the test app @@ -30,6 +32,8 @@ var _ = Describe("AutoScaler dynamic policy", func() { Expect(err).NotTo(HaveOccurred()) StartApp(appName, cfg.CfPushTimeoutDuration()) instanceName = CreatePolicy(cfg, appName, appGUID, policy) + + wg = sync.WaitGroup{} }) BeforeEach(func() { maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage @@ -105,12 +109,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { Context("when responsetime is greater than scaling out threshold", func() { BeforeEach(func() { - policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 2000) + policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 800) initialInstanceCount = 1 }) JustBeforeEach(func() { - ticker = time.NewTicker(10 * time.Second) + // simulate ongoing ~20 slow requests per second + ticker = time.NewTicker(50 * time.Millisecond) + appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg) go func(chan bool) { defer GinkgoRecover() for { @@ -120,7 +126,11 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan <- true return case <-ticker.C: - cfh.CurlApp(cfg, appName, "/responsetime/slow/3000", "-f") + wg.Add(1) + go func() { + defer wg.Done() + cfh.Curl(cfg, appUri) + }() } } }(doneChan) @@ -129,17 +139,25 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale out", Label(acceptance.LabelSmokeTests), func() { WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) }) + + AfterEach(func() { + // ginkgo requires all go-routines to be finished before reporting the result. + // let's wait for all curl executing go-routines to return. + wg.Wait() + }) }) Context("when responsetime is less than scaling in threshold", func() { BeforeEach(func() { - policy = GenerateDynamicScaleInPolicy(1, 2, "responsetime", 1000) + policy = GenerateDynamicScaleInPolicyBetween("responsetime", 800, 1200) initialInstanceCount = 2 }) JustBeforeEach(func() { - ticker = time.NewTicker(2 * time.Second) + // simulate ongoing ~20 fast requests per second + ticker = time.NewTicker(50 * time.Millisecond) + appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg) go func(chan bool) { defer GinkgoRecover() for { @@ -149,7 +167,11 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan <- true return case <-ticker.C: - cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f") + wg.Add(1) + go func() { + defer wg.Done() + cfh.Curl(cfg, appUri) + }() } } }(doneChan) @@ -158,12 +180,17 @@ var _ = Describe("AutoScaler dynamic policy", func() { It("should scale in", func() { WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) }) + + AfterEach(func() { + // ginkgo requires all go-routines to be finished before reporting the result. + // let's wait for all curl executing go-routines to return. + wg.Wait() + }) }) }) Context("when scaling by throughput", func() { - JustBeforeEach(func() { doneChan = make(chan bool) doneAcceptChan = make(chan bool) @@ -177,12 +204,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { Context("when throughput is greater than scaling out threshold", func() { BeforeEach(func() { - policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1) + policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 15) initialInstanceCount = 1 }) JustBeforeEach(func() { - ticker = time.NewTicker(25 * time.Millisecond) + // simulate ongoing ~20 requests per second + ticker = time.NewTicker(50 * time.Millisecond) + appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) go func(chan bool) { defer GinkgoRecover() for { @@ -192,7 +221,11 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan <- true return case <-ticker.C: - cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f") + wg.Add(1) + go func() { + defer wg.Done() + cfh.Curl(cfg, appUri) + }() } } }(doneChan) @@ -202,17 +235,25 @@ var _ = Describe("AutoScaler dynamic policy", func() { WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) }) + AfterEach(func() { + // ginkgo requires all go-routines to be finished before reporting the result. + // let's wait for all curl executing go-routines to return. + wg.Wait() + }) + }) Context("when throughput is less than scaling in threshold", func() { BeforeEach(func() { - policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 100) + policy = GenerateDynamicScaleInPolicyBetween("throughput", 15, 25) initialInstanceCount = 2 }) JustBeforeEach(func() { - ticker = time.NewTicker(10 * time.Second) + // simulate ongoing ~20 requests per second + ticker = time.NewTicker(50 * time.Millisecond) + appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) go func(chan bool) { defer GinkgoRecover() for { @@ -222,16 +263,26 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan <- true return case <-ticker.C: - cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f") + wg.Add(1) + go func() { + defer wg.Done() + cfh.Curl(cfg, appUri) + }() } } }(doneChan) }) + It("should scale in", func() { WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) }) - }) + AfterEach(func() { + // ginkgo requires all go-routines to be finished before reporting the result. + // let's wait for all curl executing go-routines to return. + wg.Wait() + }) + }) }) // To check existing aggregated cpu metrics do: cf asm APP_NAME cpu diff --git a/src/acceptance/helpers/helpers.go b/src/acceptance/helpers/helpers.go index e8c4712515..ee0ec5be7e 100644 --- a/src/acceptance/helpers/helpers.go +++ b/src/acceptance/helpers/helpers.go @@ -276,6 +276,52 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName return string(marshaled) } +// GenerateDynamicScaleInPolicyBetween creates a scaling policy, that scales down from 2 instances to 1, if the metric value lies within a given range. +// Example how the scaling rules must be defined to achieve a "scale down if value is in range" behaviour: +// +// val < 10 ➡ +1 ➡ don't do anything if below 10 because there are already 2 instances +// val > 30 ➡ +1 ➡ don't do anything if above 30 because there are already 2 instances +// val <= 30 ➡ -1 ➡ scale down if less than 30 +func GenerateDynamicScaleInPolicyBetween(metricName string, scaleInLowerThreshold int64, scaleInUpperThreshold int64) string { + noDownscalingWhenBelowLower := ScalingRule{ + MetricType: metricName, + BreachDurationSeconds: TestBreachDurationSeconds, + Threshold: scaleInLowerThreshold, + Operator: "<", + CoolDownSeconds: TestCoolDownSeconds, + Adjustment: "+1", + } + + noDownscalingWhenAboveUpper := ScalingRule{ + MetricType: metricName, + BreachDurationSeconds: TestBreachDurationSeconds, + Threshold: scaleInUpperThreshold, + Operator: ">", + CoolDownSeconds: TestCoolDownSeconds, + Adjustment: "+1", + } + + downscalingWhenBelowUpper := ScalingRule{ + MetricType: metricName, + BreachDurationSeconds: TestBreachDurationSeconds, + Threshold: scaleInUpperThreshold, + Operator: "<", + CoolDownSeconds: TestCoolDownSeconds, + Adjustment: "-1", + } + + policy := ScalingPolicy{ + InstanceMin: 1, + InstanceMax: 2, + ScalingRules: []*ScalingRule{&noDownscalingWhenBelowLower, &noDownscalingWhenAboveUpper, &downscalingWhenBelowUpper}, + } + + marshaled, err := MarshalWithoutHTMLEscape(policy) + Expect(err).NotTo(HaveOccurred()) + + return string(marshaled) +} + func GenerateSpecificDateSchedulePolicy(startDateTime, endDateTime time.Time, scheduledInstanceMin, scheduledInstanceMax, scheduledInstanceInit int) string { scalingInRule := ScalingRule{ MetricType: "cpu", diff --git a/src/autoscaler/envelopeprocessor/envelope_processor.go b/src/autoscaler/envelopeprocessor/envelope_processor.go index 5f7894736d..77dd69df43 100644 --- a/src/autoscaler/envelopeprocessor/envelope_processor.go +++ b/src/autoscaler/envelopeprocessor/envelope_processor.go @@ -15,25 +15,22 @@ import ( ) type EnvelopeProcessorCreator interface { - NewProcessor(logger lager.Logger, collectionInterval time.Duration) Processor + NewProcessor(logger lager.Logger) Processor } type EnvelopeProcessor interface { GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error) - GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric } var _ EnvelopeProcessor = &Processor{} type Processor struct { - logger lager.Logger - collectionInterval time.Duration + logger lager.Logger } -func NewProcessor(logger lager.Logger, collectionInterval time.Duration) Processor { +func NewProcessor(logger lager.Logger) Processor { return Processor{ - logger: logger.Session("EnvelopeProcessor"), - collectionInterval: collectionInterval, + logger: logger.Session("EnvelopeProcessor"), } } @@ -43,11 +40,6 @@ func (p Processor) GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, current p.logger.Debug("Compacted envelopes", lager.Data{"compactedEnvelopes": compactedEnvelopes}) return GetGaugeInstanceMetrics(compactedEnvelopes, currentTimeStamp) } -func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric { - p.logger.Debug("GetTimerMetrics") - p.logger.Debug("Compacted envelopes", lager.Data{"Envelopes": envelopes}) - return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval) -} // Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with // loggregator. We compact this message by matching source_id and timestamp to facilitate metrics calulations. @@ -91,10 +83,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID var metrics []models.AppInstanceMetric numRequestsPerAppIdx := calcNumReqs(envelopes) - sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes) + sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes) throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp) - responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp) + responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp) metrics = append(metrics, throughputMetrics...) metrics = append(metrics, responseTimeMetric...) diff --git a/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go b/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go index d07b60b44b..ebbce1bc6b 100644 --- a/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go +++ b/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go @@ -29,7 +29,7 @@ var _ = Describe("Envelopeprocessor", func() { JustBeforeEach(func() { logger = lagertest.NewTestLogger("envelopeProcessor") - processor = envelopeprocessor.NewProcessor(logger, TestCollectInterval) + processor = envelopeprocessor.NewProcessor(logger) }) Describe("#GetGaugeMetrics", func() { @@ -284,110 +284,8 @@ var _ = Describe("Envelopeprocessor", func() { } }) }) - - Describe("#GetTimerMetrics", func() { - BeforeEach(func() { - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "0", 10*1000*1000, 25*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 10*1000*1000, 30*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "0", 20*1000*1000, 30*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 20*1000*1000, 50*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 20*1000*1000, 30*1000*1000, 1111)) - }) - - It("sends throughput and responsetime metric to channel", func() { - timestamp := time.Now().UnixNano() - metrics := processor.GetTimerMetrics(envelopes, "test-app-id", timestamp) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameThroughput, - Unit: models.UnitRPS, - Value: "2", - Timestamp: timestamp, - })) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameResponseTime, - Unit: models.UnitMilliseconds, - Value: "13", - Timestamp: timestamp, - })) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 1, - CollectedAt: timestamp, - Name: models.MetricNameThroughput, - Unit: models.UnitRPS, - Value: "3", - Timestamp: timestamp, - })) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 1, - CollectedAt: timestamp, - Name: models.MetricNameResponseTime, - Unit: models.UnitMilliseconds, - Value: "20", - Timestamp: timestamp, - })) - }) - - Context("when no available envelopes for app", func() { - BeforeEach(func() { - envelopes = []*loggregator_v2.Envelope{} - }) - - It("sends send 0 throughput and responsetime metric", func() { - timestamp := time.Now().UnixNano() - metrics := processor.GetTimerMetrics(envelopes, "another-test-app-id", timestamp) - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "another-test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameThroughput, - Unit: models.UnitRPS, - Value: "0", - Timestamp: timestamp, - })) - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "another-test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameResponseTime, - Unit: models.UnitMilliseconds, - Value: "0", - Timestamp: timestamp, - })) - - }) - - }) - }) }) -func generateHttpStartStopEnvelope(sourceID, instance string, start, stop, timestamp int64) *loggregator_v2.Envelope { - e := &loggregator_v2.Envelope{ - SourceId: sourceID, - InstanceId: instance, - Message: &loggregator_v2.Envelope_Timer{ - Timer: &loggregator_v2.Timer{ - Name: "http", - Start: start, - Stop: stop, - }, - }, - Timestamp: timestamp, - } - return e -} - func generateMetrics(sourceID string, instance string, metrics map[string]*loggregator_v2.GaugeValue, timestamp int64) *loggregator_v2.Envelope { return &loggregator_v2.Envelope{ SourceId: sourceID, diff --git a/src/autoscaler/eventgenerator/aggregator/metric_poller.go b/src/autoscaler/eventgenerator/aggregator/metric_poller.go index f0639ce426..cf5970b7ae 100644 --- a/src/autoscaler/eventgenerator/aggregator/metric_poller.go +++ b/src/autoscaler/eventgenerator/aggregator/metric_poller.go @@ -65,7 +65,7 @@ func (m *MetricPoller) retrieveMetric(appMonitor *models.AppMonitor) error { metrics, err := m.metricClient.GetMetrics(appId, metricType, startTime, endTime) m.logger.Debug("received metrics from metricClient", lager.Data{"retrievedMetrics": metrics}) if err != nil { - return fmt.Errorf("retriveMetric Failed: %w", err) + return fmt.Errorf("retrieveMetric Failed: %w", err) } avgMetric := m.aggregate(appId, metricType, metrics) m.logger.Debug("save-aggregated-appmetric", lager.Data{"appMetric": avgMetric}) diff --git a/src/autoscaler/eventgenerator/client/log_cache_client.go b/src/autoscaler/eventgenerator/client/log_cache_client.go index c00fa10f81..99eff3ce96 100644 --- a/src/autoscaler/eventgenerator/client/log_cache_client.go +++ b/src/autoscaler/eventgenerator/client/log_cache_client.go @@ -4,8 +4,10 @@ import ( "context" "crypto/tls" "fmt" + "math" "net/http" "net/url" + "strconv" "time" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/envelopeprocessor" @@ -15,7 +17,7 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" logcache "code.cloudfoundry.org/go-log-cache/v2" - rpc "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1" + "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1" "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" "code.cloudfoundry.org/lager/v3" ) @@ -24,18 +26,20 @@ type LogCacheClient struct { logger lager.Logger Client LogCacheClientReader - now func() time.Time - envelopeProcessor envelopeprocessor.EnvelopeProcessor - goLogCache GoLogCache - TLSConfig *tls.Config - uaaCreds models.UAACreds - url string + now func() time.Time + envelopeProcessor envelopeprocessor.EnvelopeProcessor + goLogCache GoLogCache + TLSConfig *tls.Config + uaaCreds models.UAACreds + url string + collectionInterval time.Duration grpc GRPC } type LogCacheClientReader interface { Read(ctx context.Context, sourceID string, start time.Time, opts ...logcache.ReadOption) ([]*loggregator_v2.Envelope, error) + PromQL(ctx context.Context, query string, opts ...logcache.PromQLOption) (*logcache_v1.PromQL_InstantQueryResult, error) } type GRPCOptions interface { @@ -63,10 +67,10 @@ type GoLogCache struct { } type LogCacheClientCreator interface { - NewLogCacheClient(logger lager.Logger, getTime func() time.Time, envelopeProcessor envelopeprocessor.EnvelopeProcessor, addrs string) MetricClient + NewLogCacheClient(logger lager.Logger, getTime func() time.Time, collectionInterval time.Duration, envelopeProcessor envelopeprocessor.EnvelopeProcessor, url string) MetricClient } -func NewLogCacheClient(logger lager.Logger, getTime func() time.Time, envelopeProcessor envelopeprocessor.EnvelopeProcessor, url string) *LogCacheClient { +func NewLogCacheClient(logger lager.Logger, getTime func() time.Time, collectionInterval time.Duration, envelopeProcessor envelopeprocessor.EnvelopeProcessor, url string) *LogCacheClient { var c = &LogCacheClient{ logger: logger.Session("LogCacheClient"), @@ -80,6 +84,7 @@ func NewLogCacheClient(logger lager.Logger, getTime func() time.Time, envelopePr NewOauth2HTTPClient: logcache.NewOauth2HTTPClient, WithOauth2HTTPClient: logcache.WithOauth2HTTPClient, }, + collectionInterval: collectionInterval, grpc: GRPC{ WithTransportCredentials: gogrpc.WithTransportCredentials, @@ -88,29 +93,122 @@ func NewLogCacheClient(logger lager.Logger, getTime func() time.Time, envelopePr return c } -func (c *LogCacheClient) GetMetrics(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) { +func (c *LogCacheClient) emptyAppInstanceMetrics(appId string, name string, unit string, now time.Time) ([]models.AppInstanceMetric, error) { + return []models.AppInstanceMetric{ + { + AppId: appId, + InstanceIndex: 0, + Name: name, + Unit: unit, + Value: "0", + CollectedAt: now.UnixNano(), + Timestamp: now.UnixNano(), + }, + }, nil +} + +func (c *LogCacheClient) getMetricsPromQLAPI(appId string, metricType string) ([]models.AppInstanceMetric, error) { + collectionInterval := fmt.Sprintf("%.0f", c.CollectionInterval().Seconds()) + now := time.Now() + + query := "" + metricTypeUnit := "" + if metricType == models.MetricNameThroughput { + query = fmt.Sprintf("sum by (instance_id) (count_over_time(http{source_id='%s'}[%ss])) / %s", appId, collectionInterval, collectionInterval) + metricTypeUnit = models.UnitRPS + } + + if metricType == models.MetricNameResponseTime { + query = fmt.Sprintf("avg by (instance_id) (max_over_time(http{source_id='%s'}[%ss])) / (1000 * 1000)", appId, collectionInterval) + metricTypeUnit = models.UnitMilliseconds + } + + c.logger.Info("query-promql-api", lager.Data{"query": query, "appId": appId, "metricType": metricType}) + result, err := c.Client.PromQL(context.Background(), query, logcache.WithPromQLTime(now)) + if err != nil { + return []models.AppInstanceMetric{}, fmt.Errorf("failed getting PromQL result (metricType: %s, appId: %s, collectionInterval: %s, query: %s, time: %s): %w", metricType, appId, collectionInterval, query, now.String(), err) + } + c.logger.Info("received-promql-api-result", lager.Data{"result": result}) + + // safeguard: the query ensures that we get a vector but let's double-check + vector := result.GetVector() + if vector == nil { + return []models.AppInstanceMetric{}, fmt.Errorf("result does not contain a vector") + } + + // return empty metrics if there are no samples, this usually happens in case there were no recent http-requests towards the application + if len(vector.GetSamples()) <= 0 { + return c.emptyAppInstanceMetrics(appId, metricType, metricTypeUnit, now) + } + + // convert result into autoscaler metric model var metrics []models.AppInstanceMetric + for _, sample := range vector.GetSamples() { + // safeguard: metric label instance_id should be always there but let's double-check + instanceIdStr, ok := sample.GetMetric()["instance_id"] + if !ok { + return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain instance_id: %w", err) + } - var err error + instanceIdUInt, err := strconv.ParseUint(instanceIdStr, 10, 32) + if err != nil { + return []models.AppInstanceMetric{}, fmt.Errorf("could not convert instance_id to uint32: %w", err) + } + // safeguard: the query ensures that we get a point but let's double-check + point := sample.GetPoint() + if point == nil { + return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain a point") + } + + instanceId := uint32(instanceIdUInt) + valueWithoutDecimalsRoundedToCeiling := fmt.Sprintf("%.0f", math.Ceil(point.GetValue())) + + metrics = append(metrics, models.AppInstanceMetric{ + AppId: appId, + InstanceIndex: instanceId, + Name: metricType, + Unit: metricTypeUnit, + Value: valueWithoutDecimalsRoundedToCeiling, + CollectedAt: now.UnixNano(), + Timestamp: now.UnixNano(), + }) + } + return metrics, nil +} + +func (c *LogCacheClient) getMetricsRestAPI(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) { filters := logCacheFiltersFor(endTime, metricType) - c.logger.Debug("GetMetrics", lager.Data{"filters": valuesFrom(filters)}) - envelopes, err := c.Client.Read(context.Background(), appId, startTime, filters...) + c.logger.Info("query-rest-api-with-filters", lager.Data{"filters": valuesFrom(filters)}) + envelopes, err := c.Client.Read(context.Background(), appId, startTime, filters...) if err != nil { - return metrics, fmt.Errorf("fail to Read %s metric from %s GoLogCache client: %w", getEnvelopeType(metricType), appId, err) + return []models.AppInstanceMetric{}, fmt.Errorf("fail to Read %s metric from %s GoLogCache client: %w", logcache_v1.EnvelopeType_GAUGE, appId, err) } + c.logger.Info("received-rest-api-result", lager.Data{"envelopes": envelopes}) collectedAt := c.now().UnixNano() - if getEnvelopeType(metricType) == rpc.EnvelopeType_TIMER { - metrics = c.envelopeProcessor.GetTimerMetrics(envelopes, appId, collectedAt) - } else { - c.logger.Debug("envelopes received from log-cache", lager.Data{"envelopes": envelopes}) - metrics, err = c.envelopeProcessor.GetGaugeMetrics(envelopes, collectedAt) - } + metrics, err := c.envelopeProcessor.GetGaugeMetrics(envelopes, collectedAt) + return filter(metrics, metricType), err } +func (c *LogCacheClient) GetMetrics(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) { + c.logger.Debug("GetMetrics") + + // the log-cache REST API only return max. 1000 envelopes: https://github.com/cloudfoundry/log-cache-release/tree/main/src#get-apiv1readsource-id. + // receiving a limited set of envelopes breaks throughput and responsetime, because all envelopes are required to calculate these metric types properly. + // pagination via `start_time` and `end_time` could be done but is very error-prone. + // using the PromQL API also has an advantage over REST API because it shifts the metric aggregations to log-cache. + if metricType == models.MetricNameThroughput || metricType == models.MetricNameResponseTime { + c.logger.Debug("get-metrics-via-promql-api", lager.Data{"metricType": metricType}) + return c.getMetricsPromQLAPI(appId, metricType) + } + + c.logger.Debug("get-metrics-via-rest-api", lager.Data{"metricType": metricType}) + return c.getMetricsRestAPI(appId, metricType, startTime, endTime) +} + func (c *LogCacheClient) SetTLSConfig(tlsConfig *tls.Config) { c.TLSConfig = tlsConfig } @@ -158,6 +256,10 @@ func (c *LogCacheClient) GetUaaTlsConfig() *tls.Config { return &tls.Config{InsecureSkipVerify: c.uaaCreds.SkipSSLValidation} } +func (c *LogCacheClient) CollectionInterval() time.Duration { + return c.collectionInterval +} + func valuesFrom(filters []logcache.ReadOption) url.Values { values := url.Values{} for _, f := range filters { @@ -176,10 +278,10 @@ func filter(metrics []models.AppInstanceMetric, metricType string) []models.AppI return result } + func logCacheFiltersFor(endTime time.Time, metricType string) (readOptions []logcache.ReadOption) { - logMetricType := getEnvelopeType(metricType) readOptions = append(readOptions, logcache.WithEndTime(endTime)) - readOptions = append(readOptions, logcache.WithEnvelopeTypes(logMetricType)) + readOptions = append(readOptions, logcache.WithEnvelopeTypes(logcache_v1.EnvelopeType_GAUGE)) switch metricType { case models.MetricNameMemoryUtil: @@ -194,8 +296,6 @@ func logCacheFiltersFor(endTime time.Time, metricType string) (readOptions []log readOptions = append(readOptions, logcache.WithNameFilter("disk")) case models.MetricNameDiskUtil: readOptions = append(readOptions, logcache.WithNameFilter("disk|disk_quota")) - case models.MetricNameResponseTime, models.MetricNameThroughput: - readOptions = append(readOptions, logcache.WithNameFilter("http")) default: readOptions = append(readOptions, logcache.WithNameFilter(metricType)) } @@ -203,17 +303,6 @@ func logCacheFiltersFor(endTime time.Time, metricType string) (readOptions []log return readOptions } -func getEnvelopeType(metricType string) rpc.EnvelopeType { - var metricName rpc.EnvelopeType - switch metricType { - case models.MetricNameThroughput, models.MetricNameResponseTime: - metricName = rpc.EnvelopeType_TIMER - default: - metricName = rpc.EnvelopeType_GAUGE - } - return metricName -} - func (c *LogCacheClient) getUaaHttpClient() logcache.HTTPClient { return &http.Client{ Timeout: 5 * time.Second, diff --git a/src/autoscaler/eventgenerator/client/log_cache_client_test.go b/src/autoscaler/eventgenerator/client/log_cache_client_test.go index b4037e7702..030488ffb6 100644 --- a/src/autoscaler/eventgenerator/client/log_cache_client_test.go +++ b/src/autoscaler/eventgenerator/client/log_cache_client_test.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1" gogrpc "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -63,14 +64,11 @@ var _ = Describe("LogCacheClient", func() { {AppId: "some-id"}, } - logCacheClient = NewLogCacheClient( - logger, func() time.Time { return collectedAt }, - fakeEnvelopeProcessor, "") + logCacheClient = NewLogCacheClient(logger, func() time.Time { return collectedAt }, 40*time.Second, fakeEnvelopeProcessor, "") }) JustBeforeEach(func() { fakeGoLogCacheReader.ReadReturns(envelopes, logCacheClientReadError) - fakeEnvelopeProcessor.GetTimerMetricsReturns(metrics) fakeEnvelopeProcessor.GetGaugeMetricsReturnsOnCall(0, metrics, nil) fakeEnvelopeProcessor.GetGaugeMetricsReturnsOnCall(1, nil, errors.New("some error")) @@ -94,9 +92,7 @@ var _ = Describe("LogCacheClient", func() { BeforeEach(func() { expectedAddrs = "logcache:8080" - logCacheClient = NewLogCacheClient( - logger, func() time.Time { return collectedAt }, - fakeEnvelopeProcessor, expectedAddrs) + logCacheClient = NewLogCacheClient(logger, func() time.Time { return collectedAt }, 40*time.Second, fakeEnvelopeProcessor, expectedAddrs) }) Context("when consuming log cache via grpc/mtls", func() { @@ -217,6 +213,13 @@ var _ = Describe("LogCacheClient", func() { }) }) + Describe("CollectionInterval", func() { + It("returns correct collection interval", func() { + logCacheClient = NewLogCacheClient(logger, func() time.Time { return time.Now() }, 40*time.Second, fakeEnvelopeProcessor, "url") + Expect(logCacheClient.CollectionInterval()).To(Equal(40 * time.Second)) + }) + }) + Context("GetMetrics", func() { JustBeforeEach(func() { logCacheClient.Client = fakeGoLogCacheReader @@ -233,45 +236,227 @@ var _ = Describe("LogCacheClient", func() { }) }) - DescribeTable("GetMetrics for startStop Metrics", - func(metricType string, requiredFilters []string) { - metrics = []models.AppInstanceMetric{ - { - AppId: "some-id", - Name: metricType, - }, - } - fakeEnvelopeProcessor.GetTimerMetricsReturnsOnCall(0, metrics) - actualMetrics, err := logCacheClient.GetMetrics(appId, metricType, startTime, endTime) - Expect(err).NotTo(HaveOccurred()) - Expect(actualMetrics).To(Equal(metrics)) + When("errors occur reading via PromQL API", func() { + When("PromQL call fails", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(nil, errors.New("fail")) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - Expect(err).NotTo(HaveOccurred()) - Expect(fakeGoLogCacheReader.ReadCallCount()).To(Equal(1)) + When("PromQL result is not a vector", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(nil, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - By("Sends the right arguments to log-cache-client") - actualContext, actualAppId, actualStartTime, readOptions := fakeGoLogCacheReader.ReadArgsForCall(0) + When("sample does not contain instance_id", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + // "instance_id": "0", is missing here + }, + }, + }, + }, + }, + }, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - Expect(actualContext).To(Equal(context.Background())) - Expect(actualAppId).To(Equal(appId)) - Expect(actualStartTime).To(Equal(startTime)) - values := url.Values{} - readOptions[0](nil, values) - Expect(valuesFrom(readOptions[0])["end_time"][0]).To(Equal(strconv.FormatInt(int64(endTime.UnixNano()), 10))) - Expect(valuesFrom(readOptions[1])["envelope_types"][0]).To(Equal("TIMER")) - Expect(len(readOptions)).To(Equal(3), "filters by envelope type and metric names based on the requested metric type sent to GetMetrics") - Expect(valuesFrom(readOptions[2])["name_filter"][0]).To(Equal(requiredFilters[2])) + When("instance_id can not be parsed to uint", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "iam-no-uint", + }, + }, + }, + }, + }, + }, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - By("Sends the right arguments to the timer processor") - Expect(fakeEnvelopeProcessor.GetTimerMetricsCallCount()).To(Equal(1), "Should call GetHttpStartStopInstanceMetricsCallCount once") - actualEnvelopes, actualAppId, actualCurrentTimestamp := fakeEnvelopeProcessor.GetTimerMetricsArgsForCall(0) - Expect(actualEnvelopes).To(Equal(envelopes)) - Expect(actualAppId).To(Equal(appId)) - Expect(actualCurrentTimestamp).To(Equal(collectedAt.UnixNano())) - }, - Entry("When metric type is throughput", models.MetricNameThroughput, []string{"endtime", "envelope_type", "http"}), - Entry("When metric type is responsetime", models.MetricNameResponseTime, []string{"endtime", "envelope_type", "http"}), - ) + When("sample does not contain a point", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "0", + }, + }, + }, + }, + }, + }, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) + }) + + When("reading throughput metrics", func() { + When("PromQL API returns a vector with no samples", func() { + It("returns empty metrics", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{}, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(1)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("throughput")) + Expect(metrics[0].Unit).To(Equal("rps")) + Expect(metrics[0].Value).To(Equal("0")) + }) + }) + + When("promql api returns a vector with samples", func() { + It("returns metrics", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "0", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 123, + }, + }, + { + Metric: map[string]string{ + "instance_id": "1", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 321, + }, + }, + }, + }, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + + _, query, _ := fakeGoLogCacheReader.PromQLArgsForCall(0) + Expect(query).To(Equal("sum by (instance_id) (count_over_time(http{source_id='app-id'}[40s])) / 40")) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(2)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("throughput")) + Expect(metrics[0].Unit).To(Equal("rps")) + Expect(metrics[0].Value).To(Equal("123")) + + Expect(metrics[1].AppId).To(Equal("app-id")) + Expect(metrics[1].InstanceIndex).To(Equal(uint32(1))) + Expect(metrics[1].Name).To(Equal("throughput")) + Expect(metrics[1].Unit).To(Equal("rps")) + Expect(metrics[1].Value).To(Equal("321")) + }) + }) + }) + + When("reading responsetime metrics", func() { + When("PromQL API returns a vector with no samples", func() { + It("returns empty metrics", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{}, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "responsetime", startTime, endTime) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(1)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("responsetime")) + Expect(metrics[0].Unit).To(Equal("ms")) + Expect(metrics[0].Value).To(Equal("0")) + }) + }) + + When("promql api returns a vector with samples", func() { + It("returns metrics", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "0", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 200, + }, + }, + { + Metric: map[string]string{ + "instance_id": "1", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 300, + }, + }, + }, + }, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "responsetime", startTime, endTime) + + _, query, _ := fakeGoLogCacheReader.PromQLArgsForCall(0) + Expect(query).To(Equal("avg by (instance_id) (max_over_time(http{source_id='app-id'}[40s])) / (1000 * 1000)")) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(2)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("responsetime")) + Expect(metrics[0].Unit).To(Equal("ms")) + Expect(metrics[0].Value).To(Equal("200")) + + Expect(metrics[1].AppId).To(Equal("app-id")) + Expect(metrics[1].InstanceIndex).To(Equal(uint32(1))) + Expect(metrics[1].Name).To(Equal("responsetime")) + Expect(metrics[1].Unit).To(Equal("ms")) + Expect(metrics[1].Value).To(Equal("300")) + }) + }) + }) DescribeTable("GetMetrics for Gauge Metrics", func(autoscalerMetricType string, requiredFilters []string) { @@ -300,8 +485,6 @@ var _ = Describe("LogCacheClient", func() { // after starTime and envelopeType we filter the metric names Expect(valuesFrom(readOptions[2])["name_filter"][0]).To(Equal(requiredFilters[2])) - Expect(fakeEnvelopeProcessor.GetTimerMetricsCallCount()).To(Equal(0)) - By("Sends the right arguments to the gauge processor") actualEnvelopes, actualCurrentTimestamp := fakeEnvelopeProcessor.GetGaugeMetricsArgsForCall(0) Expect(actualEnvelopes).To(Equal(envelopes)) diff --git a/src/autoscaler/eventgenerator/client/metric_client_factory.go b/src/autoscaler/eventgenerator/client/metric_client_factory.go index 4583e2ddd5..cfeaafd218 100644 --- a/src/autoscaler/eventgenerator/client/metric_client_factory.go +++ b/src/autoscaler/eventgenerator/client/metric_client_factory.go @@ -34,8 +34,8 @@ func (mc *MetricClientFactory) GetMetricClient(logger lager.Logger, conf *config } func (mc *MetricClientFactory) createLogCacheMetricClient(logger lager.Logger, conf *config.Config) MetricClient { - envelopeProcessor := NewProcessor(logger, conf.Aggregator.AggregatorExecuteInterval) - c := NewLogCacheClient(logger, time.Now, envelopeProcessor, conf.MetricCollector.MetricCollectorURL) + envelopeProcessor := NewProcessor(logger) + c := NewLogCacheClient(logger, time.Now, conf.Aggregator.AggregatorExecuteInterval, envelopeProcessor, conf.MetricCollector.MetricCollectorURL) if hasUAACreds(conf) { uaaCreds := models.UAACreds{ diff --git a/src/autoscaler/eventgenerator/client/metric_client_factory_test.go b/src/autoscaler/eventgenerator/client/metric_client_factory_test.go index 3776bb6739..25d9569b7f 100644 --- a/src/autoscaler/eventgenerator/client/metric_client_factory_test.go +++ b/src/autoscaler/eventgenerator/client/metric_client_factory_test.go @@ -131,9 +131,8 @@ var _ = Describe("MetricClientFactory", func() { }) }) - It("Should set AggregatorExecuteInterval as processor collectionInterval", func() { - _, actualCollectionInterval := fakeEnvelopeProcessorCreator.NewProcessorArgsForCall(0) - Expect(actualCollectionInterval).To(Equal(conf.Aggregator.AggregatorExecuteInterval)) + It("Should set AggregatorExecuteInterval as collectionInterval on LogCacheClient", func() { + Expect(metricClient.(*LogCacheClient).CollectionInterval()).To(Equal(conf.Aggregator.AggregatorExecuteInterval)) }) }) }) diff --git a/src/autoscaler/eventgenerator/client/metric_server_client.go b/src/autoscaler/eventgenerator/client/metric_server_client.go index 2c7bdcd3b2..90669ffe97 100644 --- a/src/autoscaler/eventgenerator/client/metric_server_client.go +++ b/src/autoscaler/eventgenerator/client/metric_server_client.go @@ -34,7 +34,7 @@ func NewMetricServerClient(logger lager.Logger, url string, httpClient *http.Cli } } func (c *MetricServerClient) GetMetrics(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) { - c.logger.Debug("GetMetric") + c.logger.Debug("GetMetrics") var url string path, err := routes.MetricsCollectorRoutes().Get(routes.GetMetricHistoriesRouteName).URLPath("appid", appId, "metrictype", metricType) if err != nil { diff --git a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go index 2124302503..21e07bcde5 100644 --- a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go +++ b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_test.go @@ -267,8 +267,8 @@ var _ = Describe("Eventgenerator", func() { }) It("Should create a LogCacheClient", func() { - Eventually(runner.Session.Buffer()).ShouldNot(Say("eventgenerator.MetricServerClient.GetMetric")) - Eventually(runner.Session.Buffer(), 2).Should(Say("eventgenerator.LogCacheClient.GetMetric")) + Eventually(runner.Session.Buffer()).ShouldNot(Say("eventgenerator.MetricServerClient.GetMetrics")) + Eventually(runner.Session.Buffer(), 2).Should(Say("eventgenerator.LogCacheClient.GetMetrics")) }) It("Should initialized an envelopeProcessor", func() {