diff --git a/src/acceptance/app/dynamic_policy_test.go b/src/acceptance/app/dynamic_policy_test.go index 1d2b181997..b283bf6871 100644 --- a/src/acceptance/app/dynamic_policy_test.go +++ b/src/acceptance/app/dynamic_policy_test.go @@ -4,6 +4,7 @@ import ( "acceptance" . "acceptance/helpers" "fmt" + "sync" "time" cfh "github.com/cloudfoundry/cf-test-helpers/v2/helpers" @@ -19,6 +20,7 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan chan bool ticker *time.Ticker maxHeapLimitMb int + wg sync.WaitGroup ) const minimalMemoryUsage = 28 // observed minimal memory usage by the test app @@ -30,6 +32,8 @@ var _ = Describe("AutoScaler dynamic policy", func() { Expect(err).NotTo(HaveOccurred()) StartApp(appName, cfg.CfPushTimeoutDuration()) instanceName = CreatePolicy(cfg, appName, appGUID, policy) + + wg = sync.WaitGroup{} }) BeforeEach(func() { maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage @@ -163,7 +167,6 @@ var _ = Describe("AutoScaler dynamic policy", func() { }) Context("when scaling by throughput", func() { - JustBeforeEach(func() { doneChan = make(chan bool) doneAcceptChan = make(chan bool) @@ -177,12 +180,14 @@ var _ = Describe("AutoScaler dynamic policy", func() { Context("when throughput is greater than scaling out threshold", func() { BeforeEach(func() { - policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1) + policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 50) initialInstanceCount = 1 }) JustBeforeEach(func() { - ticker = time.NewTicker(25 * time.Millisecond) + // simulate ongoing ~100 requests per second + ticker = time.NewTicker(10 * time.Millisecond) + appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) go func(chan bool) { defer GinkgoRecover() for { @@ -192,7 +197,11 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan <- true return case <-ticker.C: - cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f") + wg.Add(1) + go func() { + defer wg.Done() + cfh.Curl(cfg, appUri) + }() } } }(doneChan) @@ -202,17 +211,25 @@ var _ = Describe("AutoScaler dynamic policy", func() { WaitForNInstancesRunning(appGUID, 2, 5*time.Minute) }) + AfterEach(func() { + // ginkgo requires all go-routines to be finished before reporting the result. + // let's wait for all curl executing go-routines to return. + wg.Wait() + }) + }) Context("when throughput is less than scaling in threshold", func() { BeforeEach(func() { - policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 100) + policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 30) initialInstanceCount = 2 }) JustBeforeEach(func() { - ticker = time.NewTicker(10 * time.Second) + // simulate ongoing ~20 requests per second + ticker = time.NewTicker(50 * time.Millisecond) + appUri := cfh.AppUri(appName, "/responsetime/fast", cfg) go func(chan bool) { defer GinkgoRecover() for { @@ -222,16 +239,26 @@ var _ = Describe("AutoScaler dynamic policy", func() { doneAcceptChan <- true return case <-ticker.C: - cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f") + wg.Add(1) + go func() { + defer wg.Done() + cfh.Curl(cfg, appUri) + }() } } }(doneChan) }) - It("should scale in", func() { + + FIt("should scale in", func() { WaitForNInstancesRunning(appGUID, 1, 5*time.Minute) }) - }) + AfterEach(func() { + // ginkgo requires all go-routines to be finished before reporting the result. + // let's wait for all curl executing go-routines to return. + wg.Wait() + }) + }) }) // To check existing aggregated cpu metrics do: cf asm APP_NAME cpu diff --git a/src/autoscaler/envelopeprocessor/envelope_processor.go b/src/autoscaler/envelopeprocessor/envelope_processor.go index 5f7894736d..38d407f4bb 100644 --- a/src/autoscaler/envelopeprocessor/envelope_processor.go +++ b/src/autoscaler/envelopeprocessor/envelope_processor.go @@ -20,7 +20,7 @@ type EnvelopeProcessorCreator interface { type EnvelopeProcessor interface { GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error) - GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric + GetCollectionInterval() time.Duration } var _ EnvelopeProcessor = &Processor{} @@ -43,10 +43,9 @@ func (p Processor) GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, current p.logger.Debug("Compacted envelopes", lager.Data{"compactedEnvelopes": compactedEnvelopes}) return GetGaugeInstanceMetrics(compactedEnvelopes, currentTimeStamp) } -func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric { - p.logger.Debug("GetTimerMetrics") - p.logger.Debug("Compacted envelopes", lager.Data{"Envelopes": envelopes}) - return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval) + +func (p Processor) GetCollectionInterval() time.Duration { + return p.collectionInterval } // Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with @@ -91,10 +90,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID var metrics []models.AppInstanceMetric numRequestsPerAppIdx := calcNumReqs(envelopes) - sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes) + sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes) throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp) - responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp) + responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp) metrics = append(metrics, throughputMetrics...) metrics = append(metrics, responseTimeMetric...) diff --git a/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go b/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go index d07b60b44b..1630575b9f 100644 --- a/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go +++ b/src/autoscaler/envelopeprocessor/envelopeprocessor_test.go @@ -285,89 +285,11 @@ var _ = Describe("Envelopeprocessor", func() { }) }) - Describe("#GetTimerMetrics", func() { - BeforeEach(func() { - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "0", 10*1000*1000, 25*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 10*1000*1000, 30*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "0", 20*1000*1000, 30*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 20*1000*1000, 50*1000*1000, 1111)) - envelopes = append(envelopes, generateHttpStartStopEnvelope("test-app-id", "1", 20*1000*1000, 30*1000*1000, 1111)) - }) - - It("sends throughput and responsetime metric to channel", func() { - timestamp := time.Now().UnixNano() - metrics := processor.GetTimerMetrics(envelopes, "test-app-id", timestamp) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameThroughput, - Unit: models.UnitRPS, - Value: "2", - Timestamp: timestamp, - })) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameResponseTime, - Unit: models.UnitMilliseconds, - Value: "13", - Timestamp: timestamp, - })) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 1, - CollectedAt: timestamp, - Name: models.MetricNameThroughput, - Unit: models.UnitRPS, - Value: "3", - Timestamp: timestamp, - })) - - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "test-app-id", - InstanceIndex: 1, - CollectedAt: timestamp, - Name: models.MetricNameResponseTime, - Unit: models.UnitMilliseconds, - Value: "20", - Timestamp: timestamp, - })) - }) - - Context("when no available envelopes for app", func() { - BeforeEach(func() { - envelopes = []*loggregator_v2.Envelope{} + Describe("GetCollectionInterval", func() { + When("interval is set", func() { + It("returns interval", func() { + Expect(processor.GetCollectionInterval()).To(Equal(TestCollectInterval)) }) - - It("sends send 0 throughput and responsetime metric", func() { - timestamp := time.Now().UnixNano() - metrics := processor.GetTimerMetrics(envelopes, "another-test-app-id", timestamp) - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "another-test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameThroughput, - Unit: models.UnitRPS, - Value: "0", - Timestamp: timestamp, - })) - Expect(metrics).To(ContainElement(models.AppInstanceMetric{ - AppId: "another-test-app-id", - InstanceIndex: 0, - CollectedAt: timestamp, - Name: models.MetricNameResponseTime, - Unit: models.UnitMilliseconds, - Value: "0", - Timestamp: timestamp, - })) - - }) - }) }) }) diff --git a/src/autoscaler/eventgenerator/aggregator/metric_poller.go b/src/autoscaler/eventgenerator/aggregator/metric_poller.go index f0639ce426..cf5970b7ae 100644 --- a/src/autoscaler/eventgenerator/aggregator/metric_poller.go +++ b/src/autoscaler/eventgenerator/aggregator/metric_poller.go @@ -65,7 +65,7 @@ func (m *MetricPoller) retrieveMetric(appMonitor *models.AppMonitor) error { metrics, err := m.metricClient.GetMetrics(appId, metricType, startTime, endTime) m.logger.Debug("received metrics from metricClient", lager.Data{"retrievedMetrics": metrics}) if err != nil { - return fmt.Errorf("retriveMetric Failed: %w", err) + return fmt.Errorf("retrieveMetric Failed: %w", err) } avgMetric := m.aggregate(appId, metricType, metrics) m.logger.Debug("save-aggregated-appmetric", lager.Data{"appMetric": avgMetric}) diff --git a/src/autoscaler/eventgenerator/client/log_cache_client.go b/src/autoscaler/eventgenerator/client/log_cache_client.go index c00fa10f81..dee7a9cc5a 100644 --- a/src/autoscaler/eventgenerator/client/log_cache_client.go +++ b/src/autoscaler/eventgenerator/client/log_cache_client.go @@ -4,8 +4,10 @@ import ( "context" "crypto/tls" "fmt" + "math" "net/http" "net/url" + "strconv" "time" "code.cloudfoundry.org/app-autoscaler/src/autoscaler/envelopeprocessor" @@ -15,6 +17,7 @@ import ( "code.cloudfoundry.org/app-autoscaler/src/autoscaler/models" logcache "code.cloudfoundry.org/go-log-cache/v2" + "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1" rpc "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1" "code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2" "code.cloudfoundry.org/lager/v3" @@ -36,6 +39,7 @@ type LogCacheClient struct { type LogCacheClientReader interface { Read(ctx context.Context, sourceID string, start time.Time, opts ...logcache.ReadOption) ([]*loggregator_v2.Envelope, error) + PromQL(ctx context.Context, query string, opts ...logcache.PromQLOption) (*logcache_v1.PromQL_InstantQueryResult, error) } type GRPCOptions interface { @@ -88,26 +92,105 @@ func NewLogCacheClient(logger lager.Logger, getTime func() time.Time, envelopePr return c } +func (c *LogCacheClient) emptyAppInstanceMetrics(appId string, name string, unit string, now time.Time) ([]models.AppInstanceMetric, error) { + return []models.AppInstanceMetric{ + { + AppId: appId, + InstanceIndex: 0, + Name: name, + Unit: unit, + Value: "0", + CollectedAt: now.UnixNano(), + Timestamp: now.UnixNano(), + }, + }, nil +} + func (c *LogCacheClient) GetMetrics(appId string, metricType string, startTime time.Time, endTime time.Time) ([]models.AppInstanceMetric, error) { var metrics []models.AppInstanceMetric - var err error + if metricType == models.MetricNameThroughput || metricType == models.MetricNameResponseTime { + collectionInterval := fmt.Sprintf("%.0f", c.envelopeProcessor.GetCollectionInterval().Seconds()) + now := time.Now() + + query := "" + metricTypeUnit := "" + if metricType == models.MetricNameThroughput { + query = fmt.Sprintf("sum by (instance_id) (count_over_time(http{source_id='%s'}[%ss])) / %s", appId, collectionInterval, collectionInterval) + metricTypeUnit = models.UnitRPS + } + + if metricType == models.MetricNameResponseTime { + query = fmt.Sprintf("avg by (instance_id) (max_over_time(http{source_id='%s'}[%ss])) / (1000 * 1000)", appId, collectionInterval) + metricTypeUnit = models.UnitMilliseconds + } + + c.logger.Info("get-metrics-promql-query", lager.Data{"query": query, "appId": appId, "metricType": metricType}) + result, err := c.Client.PromQL(context.Background(), query, logcache.WithPromQLTime(now)) + if err != nil { + return []models.AppInstanceMetric{}, fmt.Errorf("failed getting PromQL result (metricType: %s, appId: %s, collectionInterval: %s, query: %s, time: %s): %w", metricType, appId, collectionInterval, query, now.String(), err) + } + + // safeguard: the query ensures that we get a vector but let's double-check + vector := result.GetVector() + if vector == nil { + return []models.AppInstanceMetric{}, fmt.Errorf("result does not contain a vector") + } + + // return empty metrics if there are no samples, this usually happens in case there were no recent http-requests towards the application + if len(vector.GetSamples()) <= 0 { + return c.emptyAppInstanceMetrics(appId, metricType, metricTypeUnit, now) + } + + // convert result into autoscaler metric model + var metrics []models.AppInstanceMetric + for _, sample := range vector.GetSamples() { + // safeguard: metric label instance_id should be always there but let's double-check + instanceIdStr, ok := sample.GetMetric()["instance_id"] + if !ok { + return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain instance_id: %w", err) + } + + instanceIdUInt, err := strconv.ParseUint(instanceIdStr, 10, 32) + if err != nil { + return []models.AppInstanceMetric{}, fmt.Errorf("could not convert instance_id to uint32: %w", err) + } + + // safeguard: the query ensures that we get a point but let's double-check + point := sample.GetPoint() + if point == nil { + return []models.AppInstanceMetric{}, fmt.Errorf("sample does not contain a point") + } + + instanceId := uint32(instanceIdUInt) + valueWithoutDecimalsRoundedToCeiling := fmt.Sprintf("%.0f", math.Ceil(point.GetValue())) + + metrics = append(metrics, models.AppInstanceMetric{ + AppId: appId, + InstanceIndex: instanceId, + Name: metricType, + Unit: metricTypeUnit, + Value: valueWithoutDecimalsRoundedToCeiling, + CollectedAt: now.UnixNano(), + Timestamp: now.UnixNano(), + }) + } + return metrics, nil + } + filters := logCacheFiltersFor(endTime, metricType) c.logger.Debug("GetMetrics", lager.Data{"filters": valuesFrom(filters)}) envelopes, err := c.Client.Read(context.Background(), appId, startTime, filters...) if err != nil { - return metrics, fmt.Errorf("fail to Read %s metric from %s GoLogCache client: %w", getEnvelopeType(metricType), appId, err) + return metrics, fmt.Errorf("fail to Read %s metric from %s GoLogCache client: %w", rpc.EnvelopeType_GAUGE, appId, err) } collectedAt := c.now().UnixNano() - if getEnvelopeType(metricType) == rpc.EnvelopeType_TIMER { - metrics = c.envelopeProcessor.GetTimerMetrics(envelopes, appId, collectedAt) - } else { - c.logger.Debug("envelopes received from log-cache", lager.Data{"envelopes": envelopes}) - metrics, err = c.envelopeProcessor.GetGaugeMetrics(envelopes, collectedAt) - } + c.logger.Debug("envelopes received from log-cache", lager.Data{"envelopes": envelopes}) + metrics, err = c.envelopeProcessor.GetGaugeMetrics(envelopes, collectedAt) + return filter(metrics, metricType), err } @@ -176,10 +259,10 @@ func filter(metrics []models.AppInstanceMetric, metricType string) []models.AppI return result } + func logCacheFiltersFor(endTime time.Time, metricType string) (readOptions []logcache.ReadOption) { - logMetricType := getEnvelopeType(metricType) readOptions = append(readOptions, logcache.WithEndTime(endTime)) - readOptions = append(readOptions, logcache.WithEnvelopeTypes(logMetricType)) + readOptions = append(readOptions, logcache.WithEnvelopeTypes(rpc.EnvelopeType_GAUGE)) switch metricType { case models.MetricNameMemoryUtil: @@ -194,8 +277,6 @@ func logCacheFiltersFor(endTime time.Time, metricType string) (readOptions []log readOptions = append(readOptions, logcache.WithNameFilter("disk")) case models.MetricNameDiskUtil: readOptions = append(readOptions, logcache.WithNameFilter("disk|disk_quota")) - case models.MetricNameResponseTime, models.MetricNameThroughput: - readOptions = append(readOptions, logcache.WithNameFilter("http")) default: readOptions = append(readOptions, logcache.WithNameFilter(metricType)) } @@ -203,17 +284,6 @@ func logCacheFiltersFor(endTime time.Time, metricType string) (readOptions []log return readOptions } -func getEnvelopeType(metricType string) rpc.EnvelopeType { - var metricName rpc.EnvelopeType - switch metricType { - case models.MetricNameThroughput, models.MetricNameResponseTime: - metricName = rpc.EnvelopeType_TIMER - default: - metricName = rpc.EnvelopeType_GAUGE - } - return metricName -} - func (c *LogCacheClient) getUaaHttpClient() logcache.HTTPClient { return &http.Client{ Timeout: 5 * time.Second, diff --git a/src/autoscaler/eventgenerator/client/log_cache_client_test.go b/src/autoscaler/eventgenerator/client/log_cache_client_test.go index b4037e7702..21271c53be 100644 --- a/src/autoscaler/eventgenerator/client/log_cache_client_test.go +++ b/src/autoscaler/eventgenerator/client/log_cache_client_test.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1" gogrpc "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -70,7 +71,6 @@ var _ = Describe("LogCacheClient", func() { JustBeforeEach(func() { fakeGoLogCacheReader.ReadReturns(envelopes, logCacheClientReadError) - fakeEnvelopeProcessor.GetTimerMetricsReturns(metrics) fakeEnvelopeProcessor.GetGaugeMetricsReturnsOnCall(0, metrics, nil) fakeEnvelopeProcessor.GetGaugeMetricsReturnsOnCall(1, nil, errors.New("some error")) @@ -233,45 +233,229 @@ var _ = Describe("LogCacheClient", func() { }) }) - DescribeTable("GetMetrics for startStop Metrics", - func(metricType string, requiredFilters []string) { - metrics = []models.AppInstanceMetric{ - { - AppId: "some-id", - Name: metricType, - }, - } - fakeEnvelopeProcessor.GetTimerMetricsReturnsOnCall(0, metrics) - actualMetrics, err := logCacheClient.GetMetrics(appId, metricType, startTime, endTime) - Expect(err).NotTo(HaveOccurred()) - Expect(actualMetrics).To(Equal(metrics)) + When("errors occur reading via PromQL API", func() { + When("PromQL call fails", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(nil, errors.New("fail")) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - Expect(err).NotTo(HaveOccurred()) - Expect(fakeGoLogCacheReader.ReadCallCount()).To(Equal(1)) + When("PromQL result is not a vector", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(nil, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - By("Sends the right arguments to log-cache-client") - actualContext, actualAppId, actualStartTime, readOptions := fakeGoLogCacheReader.ReadArgsForCall(0) + When("sample does not contain instance_id", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + // "instance_id": "0", is missing here + }, + }, + }, + }, + }, + }, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - Expect(actualContext).To(Equal(context.Background())) - Expect(actualAppId).To(Equal(appId)) - Expect(actualStartTime).To(Equal(startTime)) - values := url.Values{} - readOptions[0](nil, values) - Expect(valuesFrom(readOptions[0])["end_time"][0]).To(Equal(strconv.FormatInt(int64(endTime.UnixNano()), 10))) - Expect(valuesFrom(readOptions[1])["envelope_types"][0]).To(Equal("TIMER")) - Expect(len(readOptions)).To(Equal(3), "filters by envelope type and metric names based on the requested metric type sent to GetMetrics") - Expect(valuesFrom(readOptions[2])["name_filter"][0]).To(Equal(requiredFilters[2])) + When("instance_id can not be parsed to uint", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "iam-no-uint", + }, + }, + }, + }, + }, + }, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) - By("Sends the right arguments to the timer processor") - Expect(fakeEnvelopeProcessor.GetTimerMetricsCallCount()).To(Equal(1), "Should call GetHttpStartStopInstanceMetricsCallCount once") - actualEnvelopes, actualAppId, actualCurrentTimestamp := fakeEnvelopeProcessor.GetTimerMetricsArgsForCall(0) - Expect(actualEnvelopes).To(Equal(envelopes)) - Expect(actualAppId).To(Equal(appId)) - Expect(actualCurrentTimestamp).To(Equal(collectedAt.UnixNano())) - }, - Entry("When metric type is throughput", models.MetricNameThroughput, []string{"endtime", "envelope_type", "http"}), - Entry("When metric type is responsetime", models.MetricNameResponseTime, []string{"endtime", "envelope_type", "http"}), - ) + When("sample does not contain a point", func() { + It("returns an error", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "0", + }, + }, + }, + }, + }, + }, nil) + _, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + Expect(err).To(HaveOccurred()) + }) + }) + }) + + When("reading throughput metrics", func() { + When("PromQL API returns a vector with no samples", func() { + It("returns empty metrics", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{}, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(1)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("throughput")) + Expect(metrics[0].Unit).To(Equal("rps")) + Expect(metrics[0].Value).To(Equal("0")) + }) + }) + + When("promql api returns a vector with samples", func() { + It("returns metrics ", func() { + fakeEnvelopeProcessor.GetCollectionIntervalReturns(40 * time.Second) + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "0", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 123, + }, + }, + { + Metric: map[string]string{ + "instance_id": "1", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 321, + }, + }, + }, + }, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "throughput", startTime, endTime) + + _, query, _ := fakeGoLogCacheReader.PromQLArgsForCall(0) + Expect(query).To(Equal("sum by (instance_id) (count_over_time(http{source_id='app-id'}[40s])) / 40")) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(2)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("throughput")) + Expect(metrics[0].Unit).To(Equal("rps")) + Expect(metrics[0].Value).To(Equal("123")) + + Expect(metrics[1].AppId).To(Equal("app-id")) + Expect(metrics[1].InstanceIndex).To(Equal(uint32(1))) + Expect(metrics[1].Name).To(Equal("throughput")) + Expect(metrics[1].Unit).To(Equal("rps")) + Expect(metrics[1].Value).To(Equal("321")) + }) + }) + }) + + When("reading responsetime metrics", func() { + When("PromQL API returns a vector with no samples", func() { + It("returns empty metrics", func() { + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{}, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "responsetime", startTime, endTime) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(1)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("responsetime")) + Expect(metrics[0].Unit).To(Equal("ms")) + Expect(metrics[0].Value).To(Equal("0")) + }) + }) + + When("promql api returns a vector with samples", func() { + It("reads from PromQL API ", func() { + fakeEnvelopeProcessor.GetCollectionIntervalReturns(40 * time.Second) + fakeGoLogCacheReader.PromQLReturns(&logcache_v1.PromQL_InstantQueryResult{ + Result: &logcache_v1.PromQL_InstantQueryResult_Vector{ + Vector: &logcache_v1.PromQL_Vector{ + Samples: []*logcache_v1.PromQL_Sample{ + { + Metric: map[string]string{ + "instance_id": "0", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 200, + }, + }, + { + Metric: map[string]string{ + "instance_id": "1", + }, + Point: &logcache_v1.PromQL_Point{ + Value: 300, + }, + }, + }, + }, + }, + }, nil) + + metrics, err := logCacheClient.GetMetrics("app-id", "responsetime", startTime, endTime) + + _, query, _ := fakeGoLogCacheReader.PromQLArgsForCall(0) + Expect(query).To(Equal("avg by (instance_id) (max_over_time(http{source_id='app-id'}[40s])) / (1000 * 1000)")) + + Expect(err).To(Not(HaveOccurred())) + Expect(metrics).To(HaveLen(2)) + + Expect(metrics[0].AppId).To(Equal("app-id")) + Expect(metrics[0].InstanceIndex).To(Equal(uint32(0))) + Expect(metrics[0].Name).To(Equal("responsetime")) + Expect(metrics[0].Unit).To(Equal("ms")) + Expect(metrics[0].Value).To(Equal("200")) + + Expect(metrics[1].AppId).To(Equal("app-id")) + Expect(metrics[1].InstanceIndex).To(Equal(uint32(1))) + Expect(metrics[1].Name).To(Equal("responsetime")) + Expect(metrics[1].Unit).To(Equal("ms")) + Expect(metrics[1].Value).To(Equal("300")) + }) + }) + }) DescribeTable("GetMetrics for Gauge Metrics", func(autoscalerMetricType string, requiredFilters []string) { @@ -300,8 +484,6 @@ var _ = Describe("LogCacheClient", func() { // after starTime and envelopeType we filter the metric names Expect(valuesFrom(readOptions[2])["name_filter"][0]).To(Equal(requiredFilters[2])) - Expect(fakeEnvelopeProcessor.GetTimerMetricsCallCount()).To(Equal(0)) - By("Sends the right arguments to the gauge processor") actualEnvelopes, actualCurrentTimestamp := fakeEnvelopeProcessor.GetGaugeMetricsArgsForCall(0) Expect(actualEnvelopes).To(Equal(envelopes))