Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eventgenerator): use PromQL API for metric types throughput and responsetime #2890

Merged
merged 16 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions src/acceptance/app/dynamic_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"acceptance"
. "acceptance/helpers"
"fmt"
"sync"
"time"

cfh "github.com/cloudfoundry/cf-test-helpers/v2/helpers"
Expand All @@ -19,6 +20,7 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan chan bool
ticker *time.Ticker
maxHeapLimitMb int
wg sync.WaitGroup
)

const minimalMemoryUsage = 28 // observed minimal memory usage by the test app
Expand All @@ -30,6 +32,8 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Expect(err).NotTo(HaveOccurred())
StartApp(appName, cfg.CfPushTimeoutDuration())
instanceName = CreatePolicy(cfg, appName, appGUID, policy)

wg = sync.WaitGroup{}
})
BeforeEach(func() {
maxHeapLimitMb = cfg.NodeMemoryLimit - minimalMemoryUsage
Expand Down Expand Up @@ -105,12 +109,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when responsetime is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 2000)
policy = GenerateDynamicScaleOutPolicy(1, 2, "responsetime", 800)
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 slow requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/slow/1000", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -120,7 +126,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/slow/3000", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -129,17 +139,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
It("should scale out", Label(acceptance.LabelSmokeTests), func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})

Context("when responsetime is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "responsetime", 1000)
policy = GenerateDynamicScaleInPolicyBetween("responsetime", 250, 350)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(2 * time.Second)
// simulate ongoing ~20 fast requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/300", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -149,7 +167,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -158,12 +180,17 @@ var _ = Describe("AutoScaler dynamic policy", func() {
It("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})

})

Context("when scaling by throughput", func() {

JustBeforeEach(func() {
doneChan = make(chan bool)
doneAcceptChan = make(chan bool)
Expand All @@ -177,12 +204,14 @@ var _ = Describe("AutoScaler dynamic policy", func() {
Context("when throughput is greater than scaling out threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 1)
policy = GenerateDynamicScaleOutPolicy(1, 2, "throughput", 15)
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved
initialInstanceCount = 1
})

JustBeforeEach(func() {
ticker = time.NewTicker(25 * time.Millisecond)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -192,7 +221,11 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
Expand All @@ -202,17 +235,25 @@ var _ = Describe("AutoScaler dynamic policy", func() {
WaitForNInstancesRunning(appGUID, 2, 5*time.Minute)
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})

})

Context("when throughput is less than scaling in threshold", func() {

BeforeEach(func() {
policy = GenerateDynamicScaleInPolicy(1, 2, "throughput", 100)
policy = GenerateDynamicScaleInPolicyBetween("throughput", 15, 25)
initialInstanceCount = 2
})

JustBeforeEach(func() {
ticker = time.NewTicker(10 * time.Second)
// simulate ongoing ~20 requests per second
ticker = time.NewTicker(50 * time.Millisecond)
appUri := cfh.AppUri(appName, "/responsetime/fast", cfg)
go func(chan bool) {
defer GinkgoRecover()
for {
Expand All @@ -222,16 +263,26 @@ var _ = Describe("AutoScaler dynamic policy", func() {
doneAcceptChan <- true
return
case <-ticker.C:
cfh.CurlApp(cfg, appName, "/responsetime/fast", "-f")
wg.Add(1)
go func() {
defer wg.Done()
cfh.Curl(cfg, appUri)
}()
}
}
}(doneChan)
})

It("should scale in", func() {
WaitForNInstancesRunning(appGUID, 1, 5*time.Minute)
})
})

AfterEach(func() {
// ginkgo requires all go-routines to be finished before reporting the result.
// let's wait for all curl executing go-routines to return.
wg.Wait()
})
})
})

// To check existing aggregated cpu metrics do: cf asm APP_NAME cpu
Expand Down
44 changes: 44 additions & 0 deletions src/acceptance/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,50 @@ func GenerateDynamicScaleOutAndInPolicy(instanceMin, instanceMax int, metricName
return string(marshaled)
}

func GenerateDynamicScaleInPolicyBetween(metricName string, scaleInLowerThreshold int64, scaleInUpperThreshold int64) string {
// < 10 => +1 => don't do anything if below 10
// < 30 => -1 => scale down if less than 30
// > 30 => +1 => don't do anything if above 30
noDownscalingWhenBelowLower := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInLowerThreshold,
Operator: "<",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "+1",
}

downscalingWhenBelowUpper := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInUpperThreshold,
Operator: "<",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "-1",
}

noDownscalingWhenAboveUpper := ScalingRule{
MetricType: metricName,
BreachDurationSeconds: TestBreachDurationSeconds,
Threshold: scaleInUpperThreshold,
Operator: ">=",
CoolDownSeconds: TestCoolDownSeconds,
Adjustment: "+1",
}

policy := ScalingPolicy{
InstanceMin: 1,
InstanceMax: 2,
ScalingRules: []*ScalingRule{&noDownscalingWhenBelowLower, &downscalingWhenBelowUpper, &noDownscalingWhenAboveUpper},
}

marshaled, err := MarshalWithoutHTMLEscape(policy)
Expect(err).NotTo(HaveOccurred())

return string(marshaled)

}
geigerj0 marked this conversation as resolved.
Show resolved Hide resolved

func GenerateSpecificDateSchedulePolicy(startDateTime, endDateTime time.Time, scheduledInstanceMin, scheduledInstanceMax, scheduledInstanceInit int) string {
scalingInRule := ScalingRule{
MetricType: "cpu",
Expand Down
13 changes: 6 additions & 7 deletions src/autoscaler/envelopeprocessor/envelope_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type EnvelopeProcessorCreator interface {

type EnvelopeProcessor interface {
GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, currentTimeStamp int64) ([]models.AppInstanceMetric, error)
GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric
GetCollectionInterval() time.Duration
}

var _ EnvelopeProcessor = &Processor{}
Expand All @@ -43,10 +43,9 @@ func (p Processor) GetGaugeMetrics(envelopes []*loggregator_v2.Envelope, current
p.logger.Debug("Compacted envelopes", lager.Data{"compactedEnvelopes": compactedEnvelopes})
return GetGaugeInstanceMetrics(compactedEnvelopes, currentTimeStamp)
}
func (p Processor) GetTimerMetrics(envelopes []*loggregator_v2.Envelope, appID string, currentTimestamp int64) []models.AppInstanceMetric {
p.logger.Debug("GetTimerMetrics")
p.logger.Debug("Compacted envelopes", lager.Data{"Envelopes": envelopes})
return GetHttpStartStopInstanceMetrics(envelopes, appID, currentTimestamp, p.collectionInterval)

func (p Processor) GetCollectionInterval() time.Duration {
return p.collectionInterval
}

// Log cache returns instance metrics such as cpu and memory in serparate envelopes, this was not the case with
Expand Down Expand Up @@ -91,10 +90,10 @@ func GetHttpStartStopInstanceMetrics(envelopes []*loggregator_v2.Envelope, appID
var metrics []models.AppInstanceMetric

numRequestsPerAppIdx := calcNumReqs(envelopes)
sumReponseTimesPerAppIdx := calcSumResponseTimes(envelopes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: typo fix

sumResponseTimesPerAppIdx := calcSumResponseTimes(envelopes)

throughputMetrics := getThroughputInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, collectionInterval, currentTimestamp)
responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumReponseTimesPerAppIdx, currentTimestamp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: typo fix

responseTimeMetric := getResponsetimeInstanceMetrics(envelopes, appID, numRequestsPerAppIdx, sumResponseTimesPerAppIdx, currentTimestamp)

metrics = append(metrics, throughputMetrics...)
metrics = append(metrics, responseTimeMetric...)
Expand Down
Loading
Loading